3232import socket
3333import sys
3434import time
35- from threading import Lock , RLock , Thread , Event
35+ from threading import Lock , RLock , Thread , Event , ThreadError
3636
3737import weakref
3838from weakref import WeakValueDictionary
@@ -2170,7 +2170,8 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
21702170 """
21712171 future = self ._create_response_future (query , parameters , trace , custom_payload , timeout , execution_profile , paging_state )
21722172 future ._protocol_handler = self .client_protocol_handler
2173- self ._on_request (future )
2173+ if self ._request_init_callbacks :
2174+ self ._on_request (future )
21742175 future .send_request ()
21752176 return future
21762177
@@ -3474,7 +3475,9 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat
34743475 self ._start_time = start_time or time .time ()
34753476 self ._spec_execution_plan = speculative_execution_plan or self ._spec_execution_plan
34763477 self ._make_query_plan ()
3477- self ._event = Event ()
3478+ self ._lock_event = Lock ()
3479+ self ._lock_event .acquire ()
3480+ self ._is_result_set = False
34783481 self ._errors = {}
34793482 self ._callbacks = []
34803483 self ._errbacks = []
@@ -3547,7 +3550,7 @@ def _on_timeout(self, _attempts=0):
35473550
35483551 def _on_speculative_execute (self ):
35493552 self ._timer = None
3550- if not self ._event . is_set () :
3553+ if not self ._is_result_set :
35513554
35523555 # PYTHON-836, the speculative queries must be after
35533556 # the query is sent from the main thread, otherwise the
@@ -3660,7 +3663,7 @@ def warnings(self):
36603663 Otherwise it may throw if the response has not been received.
36613664 """
36623665 # TODO: When timers are introduced, just make this wait
3663- if not self ._event . is_set () :
3666+ if not self ._is_result_set :
36643667 raise DriverException ("warnings cannot be retrieved before ResponseFuture is finalized" )
36653668 return self ._warnings
36663669
@@ -3678,7 +3681,7 @@ def custom_payload(self):
36783681 :return: :ref:`custom_payload`.
36793682 """
36803683 # TODO: When timers are introduced, just make this wait
3681- if not self ._event . is_set () :
3684+ if not self ._is_result_set :
36823685 raise DriverException ("custom_payload cannot be retrieved before ResponseFuture is finalized" )
36833686 return self ._custom_payload
36843687
@@ -3697,7 +3700,7 @@ def start_fetching_next_page(self):
36973700
36983701 self ._make_query_plan ()
36993702 self .message .paging_state = self ._paging_state
3700- self ._event . clear ()
3703+ self ._lock_event . acquire ()
37013704 self ._final_result = _NOT_SET
37023705 self ._final_exception = None
37033706 self ._start_timer ()
@@ -3941,7 +3944,13 @@ def _set_final_result(self, response):
39413944 for (fn , args , kwargs ) in self ._callbacks
39423945 )
39433946
3944- self ._event .set ()
3947+ self ._is_result_set = True
3948+ try :
3949+ self ._lock_event .release ()
3950+ # This can happen in speculative executions
3951+ # _set_result is called several times
3952+ except ThreadError :
3953+ pass
39453954
39463955 # apply each callback
39473956 for callback_partial in to_call :
@@ -3962,7 +3971,14 @@ def _set_final_exception(self, response):
39623971 partial (fn , response , * args , ** kwargs )
39633972 for (fn , args , kwargs ) in self ._errbacks
39643973 )
3965- self ._event .set ()
3974+
3975+ self ._is_result_set = True
3976+ try :
3977+ self ._lock_event .release ()
3978+ # This can happen in speculative executions
3979+ # _set_result is called several times
3980+ except ThreadError :
3981+ pass
39663982
39673983 # apply each callback
39683984 for callback_partial in to_call :
@@ -4019,7 +4035,13 @@ def result(self):
40194035 ... log.exception("Operation failed:")
40204036
40214037 """
4022- self ._event .wait ()
4038+ self ._lock_event .acquire ()
4039+ try :
4040+ self ._lock_event .release ()
4041+ # This can happen in speculative executions
4042+ # _set_result is called several times
4043+ except ThreadError :
4044+ pass
40234045 if self ._final_result is not _NOT_SET :
40244046 return ResultSet (self , self ._final_result )
40254047 else :
0 commit comments