Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3277,6 +3277,7 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat
self._errbacks = []
self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
self.attempted_hosts = []
self._start_timer()

def _start_timer(self):
if self._timer is None:
Expand Down Expand Up @@ -3331,11 +3332,6 @@ def send_request(self, error_no_hosts=True):
req_id = self._query(host)
if req_id is not None:
self._req_id = req_id

# timer is only started here, after we have at least one message queued
# this is done to avoid overrun of timers with unfettered client requests
# in the case of full disconnect, where no hosts will be available
self._start_timer()
return True
if self.timeout is not None and time.time() - self._start_time > self.timeout:
self._on_timeout()
Expand Down Expand Up @@ -3454,7 +3450,7 @@ def start_fetching_next_page(self):
self._event.clear()
self._final_result = _NOT_SET
self._final_exception = None
self._timer = None # clear cancelled timer; new one will be set when request is queued
self._start_timer()
self.send_request()

def _reprepare(self, prepare_message, host, connection, pool):
Expand Down Expand Up @@ -3612,6 +3608,7 @@ def _set_result(self, host, connection, pool, response):
# we got some other kind of response message
msg = "Got unexpected message: %r" % (response,)
exc = ConnectionException(msg, host)
self._cancel_timer()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this line is really necessary, as the timer will be canceled inside inside
self._set_final_exception(exc)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to cancel as quickly as possible -- without waiting for callbacks to execute in defunct below -- to avoid a pileup of timers when clients don't back off requests on a failing connection as described in PYTHON-367. Do you think this accomplishes that goal?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense

self._connection.defunct(exc)
self._set_final_exception(exc)
except Exception as exc:
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def test_default_legacy(self):
self._verify_response_future_profile(rf, expected_profile)

def test_default_profile(self):
non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)])
non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)])
cluster = Cluster(execution_profiles={'non-default': non_default_profile})
session = Session(cluster, hosts=[])

Expand Down Expand Up @@ -218,7 +218,7 @@ def test_statement_params_override_legacy(self):
self._verify_response_future_profile(rf, expected_profile)

def test_statement_params_override_profile(self):
non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)])
non_default_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)])
cluster = Cluster(execution_profiles={'non-default': non_default_profile})
session = Session(cluster, hosts=[])

Expand Down Expand Up @@ -284,15 +284,15 @@ def test_no_legacy_with_profile(self):

def test_profile_name_value(self):

internalized_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)])
internalized_profile = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)])
cluster = Cluster(execution_profiles={'by-name': internalized_profile})
session = Session(cluster, hosts=[])
self.assertEqual(cluster._config_mode, _ConfigMode.PROFILES)

rf = session.execute_async("query", execution_profile='by-name')
self._verify_response_future_profile(rf, internalized_profile)

by_value = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(5)])
by_value = ExecutionProfile(RoundRobinPolicy(), *[object() for _ in range(3)])
rf = session.execute_async("query", execution_profile=by_value)
self._verify_response_future_profile(rf, by_value)

Expand Down