Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d3d6020
Basic timer management, and timer impl for Asyncore
aholmberg Apr 30, 2015
4a9e08e
Timer for libevreactor
aholmberg Apr 30, 2015
109d6bb
Update ResponseFutures to use reactor timers
aholmberg Apr 30, 2015
6ea1504
Refactor redundant register_watcher[s] to Connection
aholmberg Apr 30, 2015
4d65357
Gevent timer implementation
aholmberg Apr 30, 2015
e3fdffb
Catch and log exceptions during timeout servicing
aholmberg Apr 30, 2015
8702813
refactor connected_event to Connection init
aholmberg Apr 30, 2015
3489813
Eventlet timer using eventlet.event.NOT_USED.
aholmberg May 1, 2015
c44365c
Use threading.Event for eventlet timeouts
aholmberg May 1, 2015
afe2e5a
Twisted timer implementation
aholmberg May 4, 2015
7f5e9df
self-->cls in create_timer classmethods
aholmberg May 4, 2015
67e19aa
Refactor _callbacks and _push_watchers init to Connection
aholmberg May 4, 2015
d7c68f9
Allow ResponseFutures without a timeout
aholmberg May 4, 2015
4b2f046
Remove deprecated, unused import from cqlengine getting started
aholmberg May 4, 2015
60f59ce
Allow canceled timers to finish ahead of end time.
aholmberg May 6, 2015
c17e6e4
libev: service timeouts before updating loop timer.
aholmberg May 6, 2015
9d8c50b
Deprecate ResponseFuture.result timeout
aholmberg May 6, 2015
70478e6
Remove unused 'timeout' in ..cluster.PagedResult
aholmberg May 6, 2015
ce15713
Remove noop from Timer class, use flag.
aholmberg May 7, 2015
fab2521
Explanation of eventletreactor service_timeouts routine
aholmberg May 8, 2015
2d3e330
Explain TwistedLoop add_timer thread interaction
aholmberg May 8, 2015
7e70d55
Coarse locking on TimerManager.service_timeouts
aholmberg May 11, 2015
0306804
Remove lock in TimerManager
aholmberg May 11, 2015
dd73791
Timer queue is now tuple instead of using __lt__ on objects
aholmberg May 11, 2015
8f08a88
populate ResponseFuture OpterationTimedOut with errors, host
aholmberg May 11, 2015
7ab74a7
Remove outdated note about no errback on timeout
aholmberg May 11, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 53 additions & 40 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import sys
import time
from threading import Lock, RLock, Thread, Event
import warnings

import six
from six.moves import range
Expand Down Expand Up @@ -71,6 +72,7 @@
BatchStatement, bind_params, QueryTrace, Statement,
named_tuple_factory, dict_factory, FETCH_SIZE_UNSET)


def _is_eventlet_monkey_patched():
if 'eventlet.patcher' not in sys.modules:
return False
Expand Down Expand Up @@ -1265,8 +1267,7 @@ class Session(object):
"""
A default timeout, measured in seconds, for queries executed through
:meth:`.execute()` or :meth:`.execute_async()`. This default may be
overridden with the `timeout` parameter for either of those methods
or the `timeout` parameter for :meth:`.ResponseFuture.result()`.
overridden with the `timeout` parameter for either of those methods.

Setting this to :const:`None` will cause no timeouts to be set by default.

Expand Down Expand Up @@ -1401,17 +1402,14 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):
trace details, the :attr:`~.Statement.trace` attribute will be left as
:const:`None`.
"""
if timeout is _NOT_SET:
timeout = self.default_timeout

if trace and not isinstance(query, Statement):
raise TypeError(
"The query argument must be an instance of a subclass of "
"cassandra.query.Statement when trace=True")

future = self.execute_async(query, parameters, trace)
future = self.execute_async(query, parameters, trace, timeout)
try:
result = future.result(timeout)
result = future.result()
finally:
if trace:
try:
Expand All @@ -1421,7 +1419,7 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False):

return result

def execute_async(self, query, parameters=None, trace=False):
def execute_async(self, query, parameters=None, trace=False, timeout=_NOT_SET):
"""
Execute the given query and return a :class:`~.ResponseFuture` object
which callbacks may be attached to for asynchronous response
Expand Down Expand Up @@ -1458,11 +1456,14 @@ def execute_async(self, query, parameters=None, trace=False):
... log.exception("Operation failed:")

"""
future = self._create_response_future(query, parameters, trace)
if timeout is _NOT_SET:
timeout = self.default_timeout

future = self._create_response_future(query, parameters, trace, timeout)
future.send_request()
return future

def _create_response_future(self, query, parameters, trace):
def _create_response_future(self, query, parameters, trace, timeout):
""" Returns the ResponseFuture before calling send_request() on it """

prepared_statement = None
Expand Down Expand Up @@ -1513,7 +1514,7 @@ def _create_response_future(self, query, parameters, trace):
message.tracing = True

return ResponseFuture(
self, message, query, self.default_timeout, metrics=self._metrics,
self, message, query, timeout, metrics=self._metrics,
prepared_statement=prepared_statement)

def prepare(self, query):
Expand Down Expand Up @@ -1543,10 +1544,10 @@ def prepare(self, query):
Preparing the same query more than once will likely affect performance.
"""
message = PrepareMessage(query=query)
future = ResponseFuture(self, message, query=None)
future = ResponseFuture(self, message, query=None, timeout=self.default_timeout)
try:
future.send_request()
query_id, column_metadata = future.result(self.default_timeout)
query_id, column_metadata = future.result()
except Exception:
log.exception("Error preparing query:")
raise
Expand All @@ -1571,7 +1572,7 @@ def prepare_on_all_hosts(self, query, excluded_host):
futures = []
for host in self._pools.keys():
if host != excluded_host and host.is_up:
future = ResponseFuture(self, PrepareMessage(query=query), None)
future = ResponseFuture(self, PrepareMessage(query=query), None, self.default_timeout)

# we don't care about errors preparing against specific hosts,
# since we can always prepare them as needed when the prepared
Expand All @@ -1592,7 +1593,7 @@ def prepare_on_all_hosts(self, query, excluded_host):

for host, future in futures:
try:
future.result(self.default_timeout)
future.result()
except Exception:
log.exception("Error preparing query for host %s:", host)

Expand Down Expand Up @@ -2579,13 +2580,14 @@ class ResponseFuture(object):
_start_time = None
_metrics = None
_paging_state = None
_timer = None

def __init__(self, session, message, query, default_timeout=None, metrics=None, prepared_statement=None):
def __init__(self, session, message, query, timeout, metrics=None, prepared_statement=None):
self.session = session
self.row_factory = session.row_factory
self.message = message
self.query = query
self.default_timeout = default_timeout
self.timeout = timeout
self._metrics = metrics
self.prepared_statement = prepared_statement
self._callback_lock = Lock()
Expand All @@ -2596,6 +2598,18 @@ def __init__(self, session, message, query, default_timeout=None, metrics=None,
self._errors = {}
self._callbacks = []
self._errbacks = []
self._start_timer()

def _start_timer(self):
if self.timeout is not None:
self._timer = self.session.cluster.connection_class.create_timer(self.timeout, self._on_timeout)

def _cancel_timer(self):
if self._timer:
self._timer.cancel()

def _on_timeout(self):
self._set_final_exception(OperationTimedOut(self._errors, self._current_host))

def _make_query_plan(self):
# convert the list/generator/etc to an iterator so that subsequent
Expand Down Expand Up @@ -2684,6 +2698,7 @@ def start_fetching_next_page(self):
self._event.clear()
self._final_result = _NOT_SET
self._final_exception = None
self._start_timer()
self.send_request()

def _reprepare(self, prepare_message):
Expand Down Expand Up @@ -2888,6 +2903,7 @@ def _execute_after_prepare(self, response):
"statement on host %s: %s" % (self._current_host, response)))

def _set_final_result(self, response):
self._cancel_timer()
if self._metrics is not None:
self._metrics.request_timer.addValue(time.time() - self._start_time)

Expand All @@ -2902,6 +2918,7 @@ def _set_final_result(self, response):
fn(response, *args, **kwargs)

def _set_final_exception(self, response):
self._cancel_timer()
if self._metrics is not None:
self._metrics.request_timer.addValue(time.time() - self._start_time)

Expand Down Expand Up @@ -2945,6 +2962,11 @@ def result(self, timeout=_NOT_SET):
encountered. If the final result or error has not been set
yet, this method will block until that time.

.. versionchanged:: 2.6.0

**`timeout` is deprecated. Use timeout in the Session execute functions instead.
The following description applies to deprecated behavior:**

You may set a timeout (in seconds) with the `timeout` parameter.
By default, the :attr:`~.default_timeout` for the :class:`.Session`
this was created through will be used for the timeout on this
Expand All @@ -2958,11 +2980,6 @@ def result(self, timeout=_NOT_SET):
This is a client-side timeout. For more information
about server-side coordinator timeouts, see :class:`.policies.RetryPolicy`.

**Important**: This timeout currently has no effect on callbacks registered
on a :class:`~.ResponseFuture` through :meth:`.ResponseFuture.add_callback` or
:meth:`.ResponseFuture.add_errback`; even if a query exceeds this default
timeout, neither the registered callback or errback will be called.

Example usage::

>>> future = session.execute_async("SELECT * FROM mycf")
Expand All @@ -2976,27 +2993,24 @@ def result(self, timeout=_NOT_SET):
... log.exception("Operation failed:")

"""
if timeout is _NOT_SET:
timeout = self.default_timeout
if timeout is not _NOT_SET:
msg = "ResponseFuture.result timeout argument is deprecated. Specify the request timeout via Session.execute[_async]."
warnings.warn(msg, DeprecationWarning)
log.warning(msg)
else:
timeout = None

self._event.wait(timeout)
# TODO: remove this conditional when deprecated timeout parameter is removed
if not self._event.is_set():
self._on_timeout()
if self._final_result is not _NOT_SET:
if self._paging_state is None:
return self._final_result
else:
return PagedResult(self, self._final_result, timeout)
elif self._final_exception:
raise self._final_exception
return PagedResult(self, self._final_result)
else:
self._event.wait(timeout=timeout)
if self._final_result is not _NOT_SET:
if self._paging_state is None:
return self._final_result
else:
return PagedResult(self, self._final_result, timeout)
elif self._final_exception:
raise self._final_exception
else:
raise OperationTimedOut(errors=self._errors, last_host=self._current_host)
raise self._final_exception

def get_query_trace(self, max_wait=None):
"""
Expand Down Expand Up @@ -3146,10 +3160,9 @@ class will be returned.

response_future = None

def __init__(self, response_future, initial_response, timeout=_NOT_SET):
def __init__(self, response_future, initial_response):
self.response_future = response_future
self.current_response = iter(initial_response)
self.timeout = timeout

def __iter__(self):
return self
Expand All @@ -3162,7 +3175,7 @@ def next(self):
raise

self.response_future.start_fetching_next_page()
result = self.response_future.result(self.timeout)
result = self.response_future.result()
if self.response_future.has_more_pages:
self.current_response = result.current_response
else:
Expand Down
Loading