Skip to content

Commit 51dd366

Browse files
committed
connection: clean up failed heartbeat sends
Keep heartbeat request-id and in-flight bookkeeping consistent when send_msg() fails.\n\nHandle the control-connection in_flight release separately from HostConnection cleanup.
1 parent ef7c2d0 commit 51dd366

2 files changed

Lines changed: 39 additions & 2 deletions

File tree

cassandra/connection.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1816,7 +1816,19 @@ def __init__(self, connection, owner):
18161816
with connection.lock:
18171817
if connection.in_flight < connection.max_request_id:
18181818
connection.in_flight += 1
1819-
connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback)
1819+
request_id = connection.get_request_id()
1820+
try:
1821+
connection.send_msg(OptionsMessage(), request_id, self._options_callback)
1822+
except Exception as exc:
1823+
if connection.is_control_connection:
1824+
connection.in_flight -= 1
1825+
# send_msg() registers the callback before writing to the socket,
1826+
# so a write failure must unwind that registration here.
1827+
connection._requests.pop(request_id, None)
1828+
if request_id not in connection.request_ids:
1829+
connection.request_ids.append(request_id)
1830+
self._exception = exc
1831+
self._event.set()
18201832
else:
18211833
self._exception = Exception("Failed to send heartbeat because connection 'in_flight' exceeds threshold")
18221834
self._event.set()

tests/unit/test_connection.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from cassandra import OperationTimedOut
2222
from cassandra.cluster import Cluster
2323
from cassandra.connection import (Connection, HEADER_DIRECTION_TO_CLIENT, ProtocolError,
24-
locally_supported_compressions, ConnectionHeartbeat, _Frame, Timer, TimerManager,
24+
locally_supported_compressions, ConnectionHeartbeat, HeartbeatFuture, _Frame, Timer, TimerManager,
2525
ConnectionException, ConnectionShutdown, DefaultEndPoint, ShardAwarePortGenerator)
2626
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
2727
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
@@ -463,6 +463,31 @@ def test_no_req_ids(self, *args):
463463
holder.return_connection.assert_has_calls(
464464
[call(max_connection)] * get_holders.call_count)
465465

466+
def test_heartbeat_future_releases_request_id_when_send_fails(self, *args):
467+
connection = Connection(DefaultEndPoint('1.2.3.4'))
468+
connection.push = Mock(side_effect=ConnectionException("write failed"))
469+
owner = Mock()
470+
initial_in_flight = connection.in_flight
471+
initial_request_ids = len(connection.request_ids)
472+
473+
# HostConnection.return_connection releases the heartbeat's in-flight slot.
474+
def return_connection(conn):
475+
with conn.lock:
476+
conn.in_flight -= 1
477+
478+
owner.return_connection.side_effect = return_connection
479+
480+
future = HeartbeatFuture(connection, owner)
481+
482+
with pytest.raises(ConnectionException):
483+
future.wait(0)
484+
485+
owner.return_connection(connection)
486+
487+
assert connection.in_flight == initial_in_flight
488+
assert len(connection.request_ids) == initial_request_ids
489+
assert not connection._requests
490+
466491
def test_unexpected_response(self, *args):
467492
request_id = 999
468493

0 commit comments

Comments
 (0)