Skip to content

Commit 4c44321

Browse files
committed
Avoid conn leak in prepared statements, better err handling
If a prepared statement was not recognized by the server (for example, if it was restarted), connections were not being properly returned to the pool. Additionally, the re-submission of the original query after the preparation completed was not borrowing connections properly (by finding the least loaded conn and incrementing its in_flight count). Last, if there was a connection error while preparing an unrecognized prepared statement, that error was raised directly instead of retrying the operation on other hosts.
1 parent 3bdcb0d commit 4c44321

2 files changed

Lines changed: 21 additions & 7 deletions

File tree

cassandra/cluster.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1681,11 +1681,17 @@ def _query(self, host):
16811681
self._errors[host] = ConnectionException("Pool is shutdown")
16821682
return None
16831683

1684+
return self._borrow_conn_and_send_message(host, pool, self.message, self._set_result)
1685+
1686+
def _borrow_conn_and_send_message(self, host, pool, message, cb):
1687+
if cb is None:
1688+
cb = self._set_result
1689+
16841690
connection = None
16851691
try:
16861692
# TODO get connectTimeout from cluster settings
16871693
connection = pool.borrow_connection(timeout=2.0)
1688-
request_id = connection.send_msg(self.message, cb=self._set_result)
1694+
request_id = connection.send_msg(message, cb=cb)
16891695
self._current_host = host
16901696
self._current_pool = pool
16911697
self._connection = connection
@@ -1791,9 +1797,11 @@ def _set_result(self, response):
17911797
prepare_message = PrepareMessage(query=prepared_statement.query_string)
17921798
# since this might block, run on the executor to avoid hanging
17931799
# the event loop thread
1794-
self.session.submit(self._connection.send_msg,
1800+
self.session.submit(self._borrow_conn_and_send_message,
1801+
self._current_host,
1802+
self._current_pool,
17951803
prepare_message,
1796-
cb=self._execute_after_prepare)
1804+
self._execute_after_prepare)
17971805
return
17981806
else:
17991807
if hasattr(response, 'to_exception'):
@@ -1846,8 +1854,8 @@ def _execute_after_prepare(self, response):
18461854
Handle the response to our attempt to prepare a statement.
18471855
If it succeeded, run the original query again against the same host.
18481856
"""
1849-
if self._final_exception:
1850-
return
1857+
if self._current_pool and self._connection:
1858+
self._current_pool.return_connection(self._connection)
18511859

18521860
if isinstance(response, ResultMessage):
18531861
if response.kind == ResultMessage.KIND_PREPARED:
@@ -1860,6 +1868,12 @@ def _execute_after_prepare(self, response):
18601868
"on host %s: %s" % (self._current_host, response)))
18611869
elif isinstance(response, ErrorMessage):
18621870
self._set_final_exception(response)
1871+
elif isinstance(response, ConnectionException):
1872+
log.debug("Connection error when preparing statement on host %s: %s",
1873+
self._current_host, response)
1874+
# try again on a different host, preparing again if necessary
1875+
self._errors[self._current_host] = response
1876+
self.send_request()
18631877
else:
18641878
self._set_final_exception(ConnectionException(
18651879
"Got unexpected response type when preparing "

tests/unit/test_response_future.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,8 @@ def test_prepared_query_not_found(self):
366366

367367
session.submit.assert_called_once()
368368
args, kwargs = session.submit.call_args
369-
self.assertIsInstance(args[-1], PrepareMessage)
370-
self.assertEquals(args[-1].query, "SELECT * FROM foobar")
369+
self.assertIsInstance(args[-2], PrepareMessage)
370+
self.assertEquals(args[-2].query, "SELECT * FROM foobar")
371371

372372
def test_prepared_query_not_found_bad_keyspace(self):
373373
session = self.make_session()

0 commit comments

Comments
 (0)