Skip to content

Commit 302a36e

Browse files
committed
Replaced event in ResponseFuture for lock
1 parent 124f44f commit 302a36e

4 files changed

Lines changed: 38 additions & 15 deletions

File tree

cassandra/cluster.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import socket
3333
import sys
3434
import time
35-
from threading import Lock, RLock, Thread, Event
35+
from threading import Lock, RLock, Thread, Event, ThreadError
3636

3737
import weakref
3838
from weakref import WeakValueDictionary
@@ -2170,7 +2170,8 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
21702170
"""
21712171
future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state)
21722172
future._protocol_handler = self.client_protocol_handler
2173-
self._on_request(future)
2173+
if self._request_init_callbacks:
2174+
self._on_request(future)
21742175
future.send_request()
21752176
return future
21762177

@@ -3474,7 +3475,9 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat
34743475
self._start_time = start_time or time.time()
34753476
self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
34763477
self._make_query_plan()
3477-
self._event = Event()
3478+
self._lock_event = Lock()
3479+
self._lock_event.acquire()
3480+
self._is_result_set = False
34783481
self._errors = {}
34793482
self._callbacks = []
34803483
self._errbacks = []
@@ -3547,7 +3550,7 @@ def _on_timeout(self, _attempts=0):
35473550

35483551
def _on_speculative_execute(self):
35493552
self._timer = None
3550-
if not self._event.is_set():
3553+
if not self._is_result_set:
35513554

35523555
# PYTHON-836, the speculative queries must be after
35533556
# the query is sent from the main thread, otherwise the
@@ -3660,7 +3663,7 @@ def warnings(self):
36603663
Otherwise it may throw if the response has not been received.
36613664
"""
36623665
# TODO: When timers are introduced, just make this wait
3663-
if not self._event.is_set():
3666+
if not self._is_result_set:
36643667
raise DriverException("warnings cannot be retrieved before ResponseFuture is finalized")
36653668
return self._warnings
36663669

@@ -3678,7 +3681,7 @@ def custom_payload(self):
36783681
:return: :ref:`custom_payload`.
36793682
"""
36803683
# TODO: When timers are introduced, just make this wait
3681-
if not self._event.is_set():
3684+
if not self._is_result_set:
36823685
raise DriverException("custom_payload cannot be retrieved before ResponseFuture is finalized")
36833686
return self._custom_payload
36843687

@@ -3697,7 +3700,7 @@ def start_fetching_next_page(self):
36973700

36983701
self._make_query_plan()
36993702
self.message.paging_state = self._paging_state
3700-
self._event.clear()
3703+
self._lock_event.acquire()
37013704
self._final_result = _NOT_SET
37023705
self._final_exception = None
37033706
self._start_timer()
@@ -3941,7 +3944,13 @@ def _set_final_result(self, response):
39413944
for (fn, args, kwargs) in self._callbacks
39423945
)
39433946

3944-
self._event.set()
3947+
self._is_result_set = True
3948+
try:
3949+
self._lock_event.release()
3950+
# This can happen in speculative executions
3951+
# _set_result is called several times
3952+
except ThreadError:
3953+
pass
39453954

39463955
# apply each callback
39473956
for callback_partial in to_call:
@@ -3962,7 +3971,14 @@ def _set_final_exception(self, response):
39623971
partial(fn, response, *args, **kwargs)
39633972
for (fn, args, kwargs) in self._errbacks
39643973
)
3965-
self._event.set()
3974+
3975+
self._is_result_set = True
3976+
try:
3977+
self._lock_event.release()
3978+
# This can happen in speculative executions
3979+
# _set_result is called several times
3980+
except ThreadError:
3981+
pass
39663982

39673983
# apply each callback
39683984
for callback_partial in to_call:
@@ -4019,7 +4035,13 @@ def result(self):
40194035
... log.exception("Operation failed:")
40204036
40214037
"""
4022-
self._event.wait()
4038+
self._lock_event.acquire()
4039+
try:
4040+
self._lock_event.release()
4041+
# This can happen in speculative executions
4042+
# _set_result is called several times
4043+
except ThreadError:
4044+
pass
40234045
if self._final_result is not _NOT_SET:
40244046
return ResultSet(self, self._final_result)
40254047
else:

tests/integration/simulacron/test_connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def test_heart_beat_timeout(self):
129129
futures.append(future)
130130

131131
for f in futures:
132-
f._event.wait()
132+
f._lock_event.acquire()
133133
self.assertIsInstance(f._final_exception, OperationTimedOut)
134134

135135
prime_request(PrimeOptions(then=NO_THEN))
@@ -333,7 +333,7 @@ def test_host_is_not_set_to_down_after_query_oto(self):
333333
futures.append(future)
334334

335335
for f in futures:
336-
f._event.wait()
336+
f._lock_event.acquire()
337337
self.assertIsInstance(f._final_exception, OperationTimedOut)
338338

339339
self.assertEqual(listener.hosts_marked_down, [])

tests/integration/simulacron/test_policies.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def test_speculative_and_timeout(self):
162162
# we have to directly wait for the event
163163
response_future = self.session.execute_async(statement, execution_profile='spec_ep_brr_lim',
164164
timeout=14)
165-
response_future._event.wait(16)
165+
response_future._lock_event.acquire(16)
166166
self.assertIsInstance(response_future._final_exception, OperationTimedOut)
167167

168168
# This is because 14 / 4 + 1 = 4

tests/unit/test_response_future.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,11 @@ def test_first_pool_shutdown(self):
303303
pool_shutdown = self.make_pool()
304304
pool_shutdown.is_shutdown = True
305305
pool_ok = self.make_pool()
306-
pool_ok.is_shutdown = True
306+
pool_ok.is_shutdown = False
307307
session._pools.get.side_effect = [pool_shutdown, pool_ok]
308308

309309
rf = self.make_response_future(session)
310+
rf.query_plan = ['ip1', 'ip2']
310311
rf.send_request()
311312

312313
rf._set_result(None, None, None, self.make_mock_response([{'col': 'val'}]))
@@ -420,6 +421,7 @@ def test_multiple_errbacks(self):
420421
retry_policy = Mock()
421422
retry_policy.on_unavailable.return_value = (RetryPolicy.RETHROW, None)
422423
rf = ResponseFuture(session, message, query, 1, retry_policy=retry_policy)
424+
rf._query_retries = 1
423425
rf.send_request()
424426

425427
callback = Mock()
@@ -436,7 +438,6 @@ def test_multiple_errbacks(self):
436438
result = Mock(spec=UnavailableErrorMessage, info={"required_replicas":2, "alive_replicas": 1, "consistency": 1})
437439
result.to_exception.return_value = expected_exception
438440
rf._set_result(None, None, None, result)
439-
rf._event.set()
440441
self.assertRaises(Exception, rf.result)
441442

442443
callback.assert_called_once_with(expected_exception, arg, **kwargs)

0 commit comments

Comments
 (0)