Skip to content

Commit 9399c57

Browse files
authored
Centralize running coroutines from threads (#906)
- Cleanup to ensure all coros we run from a thread use _LOADED_SYSTEM_TIMEOUT
1 parent e417fc0 commit 9399c57

3 files changed

Lines changed: 31 additions & 21 deletions

File tree

zeroconf/_core.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,13 @@
4343
from ._services.info import ServiceInfo, instance_name_from_service_info
4444
from ._services.registry import ServiceRegistry
4545
from ._updates import RecordUpdate, RecordUpdateListener
46-
from ._utils.asyncio import await_awaitable, get_running_loop, shutdown_loop, wait_event_or_timeout
46+
from ._utils.asyncio import (
47+
await_awaitable,
48+
get_running_loop,
49+
run_coro_with_timeout,
50+
shutdown_loop,
51+
wait_event_or_timeout,
52+
)
4753
from ._utils.name import service_type_name
4854
from ._utils.net import (
4955
IPVersion,
@@ -62,7 +68,6 @@
6268
_FLAGS_AA,
6369
_FLAGS_QR_QUERY,
6470
_FLAGS_QR_RESPONSE,
65-
_LOADED_SYSTEM_TIMEOUT,
6671
_MAX_MSG_ABSOLUTE,
6772
_MDNS_ADDR,
6873
_MDNS_ADDR6,
@@ -73,7 +78,7 @@
7378
)
7479

7580
_TC_DELAY_RANDOM_INTERVAL = (400, 500)
76-
_CLOSE_TIMEOUT = 3
81+
_CLOSE_TIMEOUT = 3000 # ms
7782
_REGISTER_BROADCASTS = 3
7883

7984

@@ -174,9 +179,7 @@ def close(self) -> None:
174179
return
175180
if not self.loop.is_running():
176181
return
177-
asyncio.run_coroutine_threadsafe(self._async_close(), self.loop).result(
178-
_CLOSE_TIMEOUT + _LOADED_SYSTEM_TIMEOUT
179-
)
182+
run_coro_with_timeout(self._async_close(), self.loop, _CLOSE_TIMEOUT)
180183

181184

182185
class AsyncListener(asyncio.Protocol, QuietLogger):
@@ -486,12 +489,13 @@ def register_service(
486489
can register the same service on the network for resilience
487490
(if you want this behavior set `cooperating_responders` to `True`)."""
488491
assert self.loop is not None
489-
asyncio.run_coroutine_threadsafe(
492+
run_coro_with_timeout(
490493
await_awaitable(
491494
self.async_register_service(info, ttl, allow_name_change, cooperating_responders)
492495
),
493496
self.loop,
494-
).result(millis_to_seconds(_REGISTER_TIME * _REGISTER_BROADCASTS) + _LOADED_SYSTEM_TIMEOUT)
497+
_REGISTER_TIME * _REGISTER_BROADCASTS,
498+
)
495499

496500
async def async_register_service(
497501
self,
@@ -522,8 +526,8 @@ def update_service(self, info: ServiceInfo) -> None:
522526
Zeroconf will then respond to requests for information for that
523527
service."""
524528
assert self.loop is not None
525-
asyncio.run_coroutine_threadsafe(await_awaitable(self.async_update_service(info)), self.loop).result(
526-
millis_to_seconds(_REGISTER_TIME * _REGISTER_BROADCASTS) + _LOADED_SYSTEM_TIMEOUT
529+
run_coro_with_timeout(
530+
await_awaitable(self.async_update_service(info)), self.loop, _REGISTER_TIME * _REGISTER_BROADCASTS
527531
)
528532

529533
async def async_update_service(self, info: ServiceInfo) -> Awaitable:
@@ -577,9 +581,9 @@ def _add_broadcast_answer( # pylint: disable=no-self-use
577581
def unregister_service(self, info: ServiceInfo) -> None:
578582
"""Unregister a service."""
579583
assert self.loop is not None
580-
asyncio.run_coroutine_threadsafe(
581-
await_awaitable(self.async_unregister_service(info)), self.loop
582-
).result(millis_to_seconds(_UNREGISTER_TIME * _REGISTER_BROADCASTS) + _LOADED_SYSTEM_TIMEOUT)
584+
run_coro_with_timeout(
585+
self.async_unregister_service(info), self.loop, _UNREGISTER_TIME * _REGISTER_BROADCASTS
586+
)
583587

584588
async def async_unregister_service(self, info: ServiceInfo) -> Awaitable:
585589
"""Unregister a service."""

zeroconf/_services/info.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
USA
2121
"""
2222

23-
import asyncio
2423
import ipaddress
2524
import socket
2625
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union, cast
@@ -29,23 +28,22 @@
2928
from .._exceptions import BadTypeInNameException
3029
from .._protocol import DNSOutgoing
3130
from .._updates import RecordUpdate, RecordUpdateListener
32-
from .._utils.asyncio import get_running_loop
31+
from .._utils.asyncio import get_running_loop, run_coro_with_timeout
3332
from .._utils.name import service_type_name
3433
from .._utils.net import (
3534
IPVersion,
3635
_encode_address,
3736
_is_v6_address,
3837
)
3938
from .._utils.struct import int2byte
40-
from .._utils.time import current_time_millis, millis_to_seconds
39+
from .._utils.time import current_time_millis
4140
from ..const import (
4241
_CLASS_IN,
4342
_CLASS_UNIQUE,
4443
_DNS_HOST_TTL,
4544
_DNS_OTHER_TTL,
4645
_FLAGS_QR_QUERY,
4746
_LISTENER_TIME,
48-
_LOADED_SYSTEM_TIMEOUT,
4947
_TYPE_A,
5048
_TYPE_AAAA,
5149
_TYPE_PTR,
@@ -426,9 +424,7 @@ def request(
426424
assert zc.loop is not None and zc.loop.is_running()
427425
if zc.loop == get_running_loop():
428426
raise RuntimeError("Use AsyncServiceInfo.async_request from the event loop")
429-
return asyncio.run_coroutine_threadsafe(
430-
self.async_request(zc, timeout, question_type), zc.loop
431-
).result(millis_to_seconds(timeout) + _LOADED_SYSTEM_TIMEOUT)
427+
return bool(run_coro_with_timeout(self.async_request(zc, timeout, question_type), zc.loop, timeout))
432428

433429
async def async_request(
434430
self, zc: 'Zeroconf', timeout: float, question_type: Optional[DNSQuestionType] = None

zeroconf/_utils/asyncio.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import asyncio
2424
import contextlib
2525
import queue
26-
from typing import Any, Awaitable, List, Optional, Set, cast
26+
from typing import Any, Awaitable, Coroutine, List, Optional, Set, cast
27+
28+
from .time import millis_to_seconds
29+
from ..const import _LOADED_SYSTEM_TIMEOUT
2730

2831
# The combined timeouts should be lower than _CLOSE_TIMEOUT + _WAIT_FOR_LOOP_TASKS_TIMEOUT
2932
_TASK_AWAIT_TIMEOUT = 1
@@ -87,6 +90,13 @@ async def await_awaitable(aw: Awaitable) -> None:
8790
await task
8891

8992

93+
def run_coro_with_timeout(aw: Coroutine, loop: asyncio.AbstractEventLoop, timeout: float) -> Any:
94+
"""Run a coroutine with a timeout."""
95+
return asyncio.run_coroutine_threadsafe(aw, loop).result(
96+
millis_to_seconds(timeout) + _LOADED_SYSTEM_TIMEOUT
97+
)
98+
99+
90100
def shutdown_loop(loop: asyncio.AbstractEventLoop) -> None:
91101
"""Wait for pending tasks and stop an event loop."""
92102
pending_tasks = set(

0 commit comments

Comments
 (0)