From 99b070af108932c9833beb5c866f8c2278550015 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 26 Sep 2023 08:56:06 -0500 Subject: [PATCH 1/2] feat: speed up outgoing multicast queue --- build_ext.py | 1 + src/zeroconf/_handlers/answers.pxd | 6 ++--- .../_handlers/multicast_outgoing_queue.py | 26 +++++++++++++------ 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/build_ext.py b/build_ext.py index 870c8058e..c431d7484 100644 --- a/build_ext.py +++ b/build_ext.py @@ -31,6 +31,7 @@ def build(setup_kwargs: Any) -> None: "src/zeroconf/_protocol/outgoing.py", "src/zeroconf/_handlers/answers.py", "src/zeroconf/_handlers/record_manager.py", + "src/zeroconf/_handlers/multicast_outgoing_queue.py", "src/zeroconf/_handlers/query_handler.py", "src/zeroconf/_services/browser.py", "src/zeroconf/_services/info.py", diff --git a/src/zeroconf/_handlers/answers.pxd b/src/zeroconf/_handlers/answers.pxd index 6a0f0e3d5..7efc45c70 100644 --- a/src/zeroconf/_handlers/answers.pxd +++ b/src/zeroconf/_handlers/answers.pxd @@ -15,9 +15,9 @@ cdef class QuestionAnswers: cdef class AnswerGroup: - cdef public object send_after - cdef public object send_before - cdef public object answers + cdef public float send_after + cdef public float send_before + cdef public cython.dict answers diff --git a/src/zeroconf/_handlers/multicast_outgoing_queue.py b/src/zeroconf/_handlers/multicast_outgoing_queue.py index 0e469d288..d45940fb3 100644 --- a/src/zeroconf/_handlers/multicast_outgoing_queue.py +++ b/src/zeroconf/_handlers/multicast_outgoing_queue.py @@ -32,9 +32,13 @@ construct_outgoing_multicast_answers, ) +RAND_INT = random.randint + if TYPE_CHECKING: from .._core import Zeroconf +_float = float + class MulticastOutgoingQueue: """An outgoing queue used to aggregate multicast responses.""" @@ -50,10 +54,13 @@ def __init__(self, zeroconf: 'Zeroconf', additional_delay: int, max_aggregation_ self.additional_delay = additional_delay self.aggregation_delay = max_aggregation_delay - def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None: + def async_add(self, now: _float, answers: _AnswerWithAdditionalsType) -> None: """Add a group of answers with additionals to the outgoing queue.""" - assert self.zc.loop is not None - random_delay = random.randint(*MULTICAST_DELAY_RANDOM_INTERVAL) + self.additional_delay + loop = self.zc.loop + if TYPE_CHECKING: + assert loop is not None + random_int = RAND_INT(*MULTICAST_DELAY_RANDOM_INTERVAL) + random_delay = random_int + self.additional_delay send_after = now + random_delay send_before = now + self.aggregation_delay + self.additional_delay if len(self.queue): @@ -66,7 +73,7 @@ def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None: last_group.answers.update(answers) return else: - self.zc.loop.call_later(millis_to_seconds(random_delay), self.async_ready) + loop.call_at(loop.time() + millis_to_seconds(random_delay), self.async_ready) self.queue.append(AnswerGroup(send_after, send_before, answers)) def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> None: @@ -77,13 +84,16 @@ def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> Non def async_ready(self) -> None: """Process anything in the queue that is ready.""" - assert self.zc.loop is not None + zc = self.zc + loop = zc.loop + if TYPE_CHECKING: + assert loop is not None now = current_time_millis() if len(self.queue) > 1 and self.queue[0].send_before > now: # There is more than one answer in the queue, # delay until we have to send it (first answer group reaches send_before) - self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_before - now), self.async_ready) + loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_before - now), self.async_ready) return answers: _AnswerWithAdditionalsType = {} @@ -94,9 +104,9 @@ def async_ready(self) -> None: if len(self.queue): # If there are still groups in the queue that are not ready to send # be sure we schedule them to go out later - self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_after - now), self.async_ready) + loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_after - now), self.async_ready) if answers: # If we have the same answer scheduled to go out, remove them self._remove_answers_from_queue(answers) - self.zc.async_send(construct_outgoing_multicast_answers(answers)) + zc.async_send(construct_outgoing_multicast_answers(answers)) From 2838f3a7024b7530225f253fcdc5c4d8b71bb35f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 26 Sep 2023 08:57:33 -0500 Subject: [PATCH 2/2] feat: speed up outgoing multicast queue --- .../_handlers/multicast_outgoing_queue.pxd | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 src/zeroconf/_handlers/multicast_outgoing_queue.pxd diff --git a/src/zeroconf/_handlers/multicast_outgoing_queue.pxd b/src/zeroconf/_handlers/multicast_outgoing_queue.pxd new file mode 100644 index 000000000..ff01ce54f --- /dev/null +++ b/src/zeroconf/_handlers/multicast_outgoing_queue.pxd @@ -0,0 +1,25 @@ + +import cython + +from .._utils.time cimport current_time_millis, millis_to_seconds +from .answers cimport AnswerGroup, construct_outgoing_multicast_answers + + +cdef object TYPE_CHECKING +cdef tuple MULTICAST_DELAY_RANDOM_INTERVAL +cdef object RAND_INT + +cdef class MulticastOutgoingQueue: + + cdef object zc + cdef object queue + cdef cython.uint additional_delay + cdef cython.uint aggregation_delay + + @cython.locals(last_group=AnswerGroup, random_int=cython.uint, random_delay=float, send_after=float, send_before=float) + cpdef async_add(self, float now, cython.dict answers) + + @cython.locals(pending=AnswerGroup) + cdef _remove_answers_from_queue(self, cython.dict answers) + + cpdef async_ready(self)