Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def build(setup_kwargs: Any) -> None:
"src/zeroconf/_protocol/outgoing.py",
"src/zeroconf/_handlers/answers.py",
"src/zeroconf/_handlers/record_manager.py",
"src/zeroconf/_handlers/multicast_outgoing_queue.py",
"src/zeroconf/_handlers/query_handler.py",
"src/zeroconf/_services/browser.py",
"src/zeroconf/_services/info.py",
Expand Down
6 changes: 3 additions & 3 deletions src/zeroconf/_handlers/answers.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ cdef class QuestionAnswers:

cdef class AnswerGroup:

cdef public object send_after
cdef public object send_before
cdef public object answers
cdef public float send_after
cdef public float send_before
cdef public cython.dict answers



Expand Down
25 changes: 25 additions & 0 deletions src/zeroconf/_handlers/multicast_outgoing_queue.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

import cython

from .._utils.time cimport current_time_millis, millis_to_seconds
from .answers cimport AnswerGroup, construct_outgoing_multicast_answers


cdef object TYPE_CHECKING
cdef tuple MULTICAST_DELAY_RANDOM_INTERVAL
cdef object RAND_INT

cdef class MulticastOutgoingQueue:

cdef object zc
cdef object queue
cdef cython.uint additional_delay
cdef cython.uint aggregation_delay

@cython.locals(last_group=AnswerGroup, random_int=cython.uint, random_delay=float, send_after=float, send_before=float)
cpdef async_add(self, float now, cython.dict answers)

@cython.locals(pending=AnswerGroup)
cdef _remove_answers_from_queue(self, cython.dict answers)

cpdef async_ready(self)
26 changes: 18 additions & 8 deletions src/zeroconf/_handlers/multicast_outgoing_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
construct_outgoing_multicast_answers,
)

RAND_INT = random.randint

if TYPE_CHECKING:
from .._core import Zeroconf

_float = float


class MulticastOutgoingQueue:
"""An outgoing queue used to aggregate multicast responses."""
Expand All @@ -50,10 +54,13 @@ def __init__(self, zeroconf: 'Zeroconf', additional_delay: int, max_aggregation_
self.additional_delay = additional_delay
self.aggregation_delay = max_aggregation_delay

def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None:
def async_add(self, now: _float, answers: _AnswerWithAdditionalsType) -> None:
"""Add a group of answers with additionals to the outgoing queue."""
assert self.zc.loop is not None
random_delay = random.randint(*MULTICAST_DELAY_RANDOM_INTERVAL) + self.additional_delay
loop = self.zc.loop
if TYPE_CHECKING:
assert loop is not None
random_int = RAND_INT(*MULTICAST_DELAY_RANDOM_INTERVAL)
random_delay = random_int + self.additional_delay
send_after = now + random_delay
send_before = now + self.aggregation_delay + self.additional_delay
if len(self.queue):
Expand All @@ -66,7 +73,7 @@ def async_add(self, now: float, answers: _AnswerWithAdditionalsType) -> None:
last_group.answers.update(answers)
return
else:
self.zc.loop.call_later(millis_to_seconds(random_delay), self.async_ready)
loop.call_at(loop.time() + millis_to_seconds(random_delay), self.async_ready)
self.queue.append(AnswerGroup(send_after, send_before, answers))

def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> None:
Expand All @@ -77,13 +84,16 @@ def _remove_answers_from_queue(self, answers: _AnswerWithAdditionalsType) -> Non

def async_ready(self) -> None:
"""Process anything in the queue that is ready."""
assert self.zc.loop is not None
zc = self.zc
loop = zc.loop
if TYPE_CHECKING:
assert loop is not None
now = current_time_millis()

if len(self.queue) > 1 and self.queue[0].send_before > now:
# There is more than one answer in the queue,
# delay until we have to send it (first answer group reaches send_before)
self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_before - now), self.async_ready)
loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_before - now), self.async_ready)
return

answers: _AnswerWithAdditionalsType = {}
Expand All @@ -94,9 +104,9 @@ def async_ready(self) -> None:
if len(self.queue):
# If there are still groups in the queue that are not ready to send
# be sure we schedule them to go out later
self.zc.loop.call_later(millis_to_seconds(self.queue[0].send_after - now), self.async_ready)
loop.call_at(loop.time() + millis_to_seconds(self.queue[0].send_after - now), self.async_ready)

if answers:
# If we have the same answer scheduled to go out, remove them
self._remove_answers_from_queue(answers)
self.zc.async_send(construct_outgoing_multicast_answers(answers))
zc.async_send(construct_outgoing_multicast_answers(answers))