From 8b106c31fd4f214dd15018adc7b24648cfd6b104 Mon Sep 17 00:00:00 2001 From: Jim Witschey Date: Mon, 16 Jan 2017 12:17:05 -0500 Subject: [PATCH 1/2] Revert "Only start timer if request is actually issued." This reverts commit 1a01f118240e5d9af009653cfff80cdb67ee099 for PYTHON-644. The "create timer on request" strategy can let timers leak. --- cassandra/cluster.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 18c1734474..e225ef24a1 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3277,6 +3277,7 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat self._errbacks = [] self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan self.attempted_hosts = [] + self._start_timer() def _start_timer(self): if self._timer is None: @@ -3331,11 +3332,6 @@ def send_request(self, error_no_hosts=True): req_id = self._query(host) if req_id is not None: self._req_id = req_id - - # timer is only started here, after we have at least one message queued - # this is done to avoid overrun of timers with unfettered client requests - # in the case of full disconnect, where no hosts will be available - self._start_timer() return True if self.timeout is not None and time.time() - self._start_time > self.timeout: self._on_timeout() @@ -3454,7 +3450,7 @@ def start_fetching_next_page(self): self._event.clear() self._final_result = _NOT_SET self._final_exception = None - self._timer = None # clear cancelled timer; new one will be set when request is queued + self._start_timer() self.send_request() def _reprepare(self, prepare_message, host, connection, pool): From 8aeb926644bcc368d446feee4b3a4e55dd22d673 Mon Sep 17 00:00:00 2001 From: Jim Witschey Date: Mon, 16 Jan 2017 16:05:01 -0500 Subject: [PATCH 2/2] cancel timers before defuncting connection This allows us to revert some of the fix to PYTHON-367 without the timer heap growing out of control as described on that ticket. We cancel before defuncting to ensure that the timer doesn't hang around while running callbacks. Between this change and the delegation of callbacks to a secondary thread (see commit 51923029, part of the original PYTHON-367 fix), this commit should not cause a regression on PYTHON-367. Note that we don't cancel timers before defuncting and retrying -- we're not done with the logical request and still want that timer to fire. Now that timers are started on ResponseFuture initialization, we actually use the value of timeout in the tests, so we no longer pass that argument, allowing the test to use the default value. --- cassandra/cluster.py | 1 + tests/unit/test_cluster.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index e225ef24a1..cc5cd78a1c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3608,6 +3608,7 @@ def _set_result(self, host, connection, pool, response): # we got some other kind of response message msg = "Got unexpected message: %r" % (response,) exc = ConnectionException(msg, host) + self._cancel_timer() self._connection.defunct(exc) self._set_final_exception(exc) except Exception as exc: diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 279b0f94a1..b3337f6c4d 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -184,7 +184,7 @@ def test_default_legacy(self): self._verify_response_future_profile(rf, expected_profile) def test_default_profile(self): - non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)]) + non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'non-default': non_default_profile}) session = Session(cluster, hosts=[]) @@ -218,7 +218,7 @@ def test_statement_params_override_legacy(self): self._verify_response_future_profile(rf, expected_profile) def test_statement_params_override_profile(self): - non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)]) + non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'non-default': non_default_profile}) session = Session(cluster, hosts=[]) @@ -284,7 +284,7 @@ def test_no_legacy_with_profile(self): def test_profile_name_value(self): - internalized_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)]) + internalized_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'by-name': internalized_profile}) session = Session(cluster, hosts=[]) self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES) @@ -292,7 +292,7 @@ def test_profile_name_value(self): rf = session.execute_async("query", execution_profile='by-name') self._verify_response_future_profile(rf, internalized_profile) - by_value = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)]) + by_value = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) rf = session.execute_async("query", execution_profile=by_value) self._verify_response_future_profile(rf, by_value)