Skip to content

Commit 0b21341

Browse files
committed
add RETRY_NEXT_HOST decision, default to this on unavailable
PYTHON-285
1 parent 613c005 commit 0b21341

4 files changed

Lines changed: 41 additions & 33 deletions

File tree

cassandra/cluster.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3058,15 +3058,17 @@ def _set_result(self, response):
30583058
return
30593059

30603060
retry_type, consistency = retry
3061-
if retry_type is RetryPolicy.RETRY:
3061+
if retry_type in (RetryPolicy.RETRY, RetryPolicy.RETRY_NEXT_HOST):
30623062
self._query_retries += 1
3063-
self._retry(reuse_connection=True, consistency_level=consistency)
3063+
reuse = retry_type == RetryPolicy.RETRY
3064+
self._retry(reuse_connection=reuse, consistency_level=consistency)
30643065
elif retry_type is RetryPolicy.RETHROW:
30653066
self._set_final_exception(response.to_exception())
30663067
else: # IGNORE
30673068
if self._metrics is not None:
30683069
self._metrics.on_ignore()
30693070
self._set_final_result(None)
3071+
self._errors[self._current_host] = response.to_exception()
30703072
elif isinstance(response, ConnectionException):
30713073
if self._metrics is not None:
30723074
self._metrics.on_connection_error()

cassandra/policies.py

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -551,8 +551,8 @@ def __init__(self, base_delay, max_delay, max_attempts=64):
551551
self.max_attempts = max_attempts
552552

553553
def new_schedule(self):
554-
i=0
555-
while self.max_attempts == None or i < self.max_attempts:
554+
i = 0
555+
while self.max_attempts is None or i < self.max_attempts:
556556
yield min(self.base_delay * (2 ** i), self.max_delay)
557557
i += 1
558558

@@ -647,6 +647,12 @@ class or one of its subclasses.
647647
should be ignored but no more retries should be attempted.
648648
"""
649649

650+
RETRY_NEXT_HOST = 3
651+
"""
652+
This should be returned from the below methods if the operation
653+
should be retried on another connection.
654+
"""
655+
650656
def on_read_timeout(self, query, consistency, required_responses,
651657
received_responses, data_retrieved, retry_num):
652658
"""
@@ -674,11 +680,11 @@ def on_read_timeout(self, query, consistency, required_responses,
674680
a sufficient number of replicas responded (with data digests).
675681
"""
676682
if retry_num != 0:
677-
return (self.RETHROW, None)
683+
return self.RETHROW, None
678684
elif received_responses >= required_responses and not data_retrieved:
679-
return (self.RETRY, consistency)
685+
return self.RETRY, consistency
680686
else:
681-
return (self.RETHROW, None)
687+
return self.RETHROW, None
682688

683689
def on_write_timeout(self, query, consistency, write_type,
684690
required_responses, received_responses, retry_num):
@@ -707,11 +713,11 @@ def on_write_timeout(self, query, consistency, write_type,
707713
:attr:`~.WriteType.BATCH_LOG`.
708714
"""
709715
if retry_num != 0:
710-
return (self.RETHROW, None)
716+
return self.RETHROW, None
711717
elif write_type == WriteType.BATCH_LOG:
712-
return (self.RETRY, consistency)
718+
return self.RETRY, consistency
713719
else:
714-
return (self.RETHROW, None)
720+
return self.RETHROW, None
715721

716722
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
717723
"""
@@ -736,7 +742,7 @@ def on_unavailable(self, query, consistency, required_replicas, alive_replicas,
736742
737743
By default, no retries will be attempted and the error will be re-raised.
738744
"""
739-
return (self.RETHROW, None)
745+
return (self.RETRY_NEXT_HOST, consistency) if retry_num == 0 else (self.RETHROW, None)
740746

741747

742748
class FallthroughRetryPolicy(RetryPolicy):
@@ -746,13 +752,13 @@ class FallthroughRetryPolicy(RetryPolicy):
746752
"""
747753

748754
def on_read_timeout(self, *args, **kwargs):
749-
return (self.RETHROW, None)
755+
return self.RETHROW, None
750756

751757
def on_write_timeout(self, *args, **kwargs):
752-
return (self.RETHROW, None)
758+
return self.RETHROW, None
753759

754760
def on_unavailable(self, *args, **kwargs):
755-
return (self.RETHROW, None)
761+
return self.RETHROW, None
756762

757763

758764
class DowngradingConsistencyRetryPolicy(RetryPolicy):
@@ -804,46 +810,46 @@ class DowngradingConsistencyRetryPolicy(RetryPolicy):
804810
"""
805811
def _pick_consistency(self, num_responses):
806812
if num_responses >= 3:
807-
return (self.RETRY, ConsistencyLevel.THREE)
813+
return self.RETRY, ConsistencyLevel.THREE
808814
elif num_responses >= 2:
809-
return (self.RETRY, ConsistencyLevel.TWO)
815+
return self.RETRY, ConsistencyLevel.TWO
810816
elif num_responses >= 1:
811-
return (self.RETRY, ConsistencyLevel.ONE)
817+
return self.RETRY, ConsistencyLevel.ONE
812818
else:
813-
return (self.RETHROW, None)
819+
return self.RETHROW, None
814820

815821
def on_read_timeout(self, query, consistency, required_responses,
816822
received_responses, data_retrieved, retry_num):
817823
if retry_num != 0:
818-
return (self.RETHROW, None)
824+
return self.RETHROW, None
819825
elif received_responses < required_responses:
820826
return self._pick_consistency(received_responses)
821827
elif not data_retrieved:
822-
return (self.RETRY, consistency)
828+
return self.RETRY, consistency
823829
else:
824-
return (self.RETHROW, None)
830+
return self.RETHROW, None
825831

826832
def on_write_timeout(self, query, consistency, write_type,
827833
required_responses, received_responses, retry_num):
828834
if retry_num != 0:
829-
return (self.RETHROW, None)
835+
return self.RETHROW, None
830836

831837
if write_type in (WriteType.SIMPLE, WriteType.BATCH, WriteType.COUNTER):
832838
if received_responses > 0:
833839
# persisted on at least one replica
834-
return (self.IGNORE, None)
840+
return self.IGNORE, None
835841
else:
836-
return (self.RETHROW, None)
842+
return self.RETHROW, None
837843
elif write_type == WriteType.UNLOGGED_BATCH:
838844
return self._pick_consistency(received_responses)
839845
elif write_type == WriteType.BATCH_LOG:
840-
return (self.RETRY, consistency)
846+
return self.RETRY, consistency
841847

842-
return (self.RETHROW, None)
848+
return self.RETHROW, None
843849

844850
def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
845851
if retry_num != 0:
846-
return (self.RETHROW, None)
852+
return self.RETHROW, None
847853
else:
848854
return self._pick_consistency(alive_replicas)
849855

tests/integration/standard/test_metrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,13 @@ def test_unavailable(self):
145145
query = SimpleStatement("INSERT INTO test (k, v) VALUES (2, 2)", consistency_level=ConsistencyLevel.ALL)
146146
with self.assertRaises(Unavailable):
147147
self.session.execute(query)
148-
self.assertEqual(1, self.cluster.metrics.stats.unavailables)
148+
self.assertEqual(2, self.cluster.metrics.stats.unavailables)
149149

150150
# Test write
151151
query = SimpleStatement("SELECT * FROM test", consistency_level=ConsistencyLevel.ALL)
152152
with self.assertRaises(Unavailable):
153153
self.session.execute(query, timeout=None)
154-
self.assertEqual(2, self.cluster.metrics.stats.unavailables)
154+
self.assertEqual(4, self.cluster.metrics.stats.unavailables)
155155
finally:
156156
get_node(1).start(wait_other_notice=True, wait_for_binary_proto=True)
157157
# Give some time for the cluster to come back up, for the next test

tests/unit/test_policies.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -916,14 +916,14 @@ def test_unavailable(self):
916916
retry, consistency = policy.on_unavailable(
917917
query=None, consistency=ONE,
918918
required_replicas=1, alive_replicas=2, retry_num=0)
919-
self.assertEqual(retry, RetryPolicy.RETHROW)
920-
self.assertEqual(consistency, None)
919+
self.assertEqual(retry, RetryPolicy.RETRY_NEXT_HOST)
920+
self.assertEqual(consistency, ONE)
921921

922922
retry, consistency = policy.on_unavailable(
923923
query=None, consistency=ONE,
924924
required_replicas=10000, alive_replicas=1, retry_num=0)
925-
self.assertEqual(retry, RetryPolicy.RETHROW)
926-
self.assertEqual(consistency, None)
925+
self.assertEqual(retry, RetryPolicy.RETRY_NEXT_HOST)
926+
self.assertEqual(consistency, ONE)
927927

928928

929929
class FallthroughRetryPolicyTest(unittest.TestCase):

0 commit comments

Comments
 (0)