diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 18c1734474..cc5cd78a1c 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3277,6 +3277,7 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat self._errbacks = [] self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan self.attempted_hosts = [] + self._start_timer() def _start_timer(self): if self._timer is None: @@ -3331,11 +3332,6 @@ def send_request(self, error_no_hosts=True): req_id = self._query(host) if req_id is not None: self._req_id = req_id - - # timer is only started here, after we have at least one message queued - # this is done to avoid overrun of timers with unfettered client requests - # in the case of full disconnect, where no hosts will be available - self._start_timer() return True if self.timeout is not None and time.time() - self._start_time > self.timeout: self._on_timeout() @@ -3454,7 +3450,7 @@ def start_fetching_next_page(self): self._event.clear() self._final_result = _NOT_SET self._final_exception = None - self._timer = None # clear cancelled timer; new one will be set when request is queued + self._start_timer() self.send_request() def _reprepare(self, prepare_message, host, connection, pool): @@ -3612,6 +3608,7 @@ def _set_result(self, host, connection, pool, response): # we got some other kind of response message msg = "Got unexpected message: %r" % (response,) exc = ConnectionException(msg, host) + self._cancel_timer() self._connection.defunct(exc) self._set_final_exception(exc) except Exception as exc: diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 279b0f94a1..b3337f6c4d 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -184,7 +184,7 @@ def test_default_legacy(self): self._verify_response_future_profile(rf, expected_profile) def test_default_profile(self): - non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)]) + non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'non-default': non_default_profile}) session = Session(cluster, hosts=[]) @@ -218,7 +218,7 @@ def test_statement_params_override_legacy(self): self._verify_response_future_profile(rf, expected_profile) def test_statement_params_override_profile(self): - non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)]) + non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'non-default': non_default_profile}) session = Session(cluster, hosts=[]) @@ -284,7 +284,7 @@ def test_no_legacy_with_profile(self): def test_profile_name_value(self): - internalized_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)]) + internalized_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) cluster = Cluster(execution_profiles={'by-name': internalized_profile}) session = Session(cluster, hosts=[]) self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES) @@ -292,7 +292,7 @@ def test_profile_name_value(self): rf = session.execute_async("query", execution_profile='by-name') self._verify_response_future_profile(rf, internalized_profile) - by_value = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)]) + by_value = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)]) rf = session.execute_async("query", execution_profile=by_value) self._verify_response_future_profile(rf, by_value)