Skip to content

Commit aaccf2f

Browse files
authored
Merge pull request apache#873 from datastax/python-836
PYTHON-836: Added lock to query plan for speculative execution
2 parents bf21403 + 1cf623f commit aaccf2f

2 files changed

Lines changed: 19 additions & 2 deletions

File tree

cassandra/cluster.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3460,12 +3460,12 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat
34603460
self.prepared_statement = prepared_statement
34613461
self._callback_lock = Lock()
34623462
self._start_time = start_time or time.time()
3463+
self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
34633464
self._make_query_plan()
34643465
self._event = Event()
34653466
self._errors = {}
34663467
self._callbacks = []
34673468
self._errbacks = []
3468-
self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
34693469
self.attempted_hosts = []
34703470
self._start_timer()
34713471

@@ -3534,6 +3534,10 @@ def _make_query_plan(self):
35343534
# calls to send_request (which retries may do) will resume where
35353535
# they last left off
35363536
self.query_plan = iter(self._load_balancer.make_query_plan(self.session.keyspace, self.query))
3537+
# Make iterator thread safe when there can be a speculative delay since it could
3538+
# from different threads
3539+
if isinstance(self._spec_execution_plan, NoSpeculativeExecutionPlan):
3540+
self.query_plan = _threadsafe_iter(self.query_plan)
35373541

35383542
def send_request(self, error_no_hosts=True):
35393543
""" Internal """
@@ -4306,3 +4310,16 @@ def paging_state(self):
43064310
avoid sending this to untrusted parties.
43074311
"""
43084312
return self.response_future._paging_state
4313+
4314+
4315+
class _threadsafe_iter(six.Iterator):
4316+
def __init__(self, it):
4317+
self.it = it
4318+
self.lock = Lock()
4319+
4320+
def __iter__(self):
4321+
return self
4322+
4323+
def __next__(self):
4324+
with self.lock:
4325+
return next(self.it)

tests/integration/standard/test_policies.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def test_delay_can_be_0(self):
104104
@expected_result all the queries are executed immediately
105105
@test_category policy
106106
"""
107-
number_of_requests = 6
107+
number_of_requests = 4
108108
spec = ExecutionProfile(speculative_execution_policy=ConstantSpeculativeExecutionPolicy(0, number_of_requests))
109109

110110
cluster = Cluster()

0 commit comments

Comments
 (0)