Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ setuptools*.egg
cassandra/*.c
!cassandra/cmurmur3.c
cassandra/*.html
tests/unit/cython/bytesio_testhelper.c

# OSX
.DS_Store
Expand Down
185 changes: 120 additions & 65 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
BatchMessage, RESULT_KIND_PREPARED,
RESULT_KIND_SET_KEYSPACE, RESULT_KIND_ROWS,
RESULT_KIND_SCHEMA_CHANGE, MIN_SUPPORTED_VERSION,
ProtocolHandler)
ProtocolHandler, _RESULT_SEQUENCE_TYPES)
from cassandra.metadata import Metadata, protect_name, murmur3
from cassandra.policies import (TokenAwarePolicy, DCAwareRoundRobinPolicy, SimpleConvictionPolicy,
ExponentialReconnectionPolicy, HostDistance,
Expand Down Expand Up @@ -1601,34 +1601,14 @@ def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_
no timeout. Please see :meth:`.ResponseFuture.result` for details on
the scope and effect of this timeout.

If `trace` is set to :const:`True`, an attempt will be made to
fetch the trace details and attach them to the `query`'s
:attr:`~.Statement.trace` attribute in the form of a :class:`.QueryTrace`
instance. This requires that `query` be a :class:`.Statement` subclass
instance and not just a string. If there is an error fetching the
trace details, the :attr:`~.Statement.trace` attribute will be left as
:const:`None`.
If `trace` is set to :const:`True`, the query will be sent with tracing enabled.
The trace details can be obtained using the returned :class:`.ResultSet` object.

`custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
If `query` is a Statement with its own custom_payload. The message payload
will be a union of the two, with the values specified here taking precedence.
"""
if trace and not isinstance(query, Statement):
raise TypeError(
"The query argument must be an instance of a subclass of "
"cassandra.query.Statement when trace=True")

future = self.execute_async(query, parameters, trace, custom_payload, timeout)
try:
result = future.result()
finally:
if trace:
try:
query.trace = future.get_query_trace(self.max_trace_wait)
except Exception:
log.exception("Unable to fetch query trace:")

return result
return self.execute_async(query, parameters, trace, custom_payload, timeout).result()

def execute_async(self, query, parameters=None, trace=False, custom_payload=None, timeout=_NOT_SET):
"""
Expand All @@ -1638,9 +1618,9 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
on the :class:`.ResponseFuture` to syncronously block for results at
any time.

If `trace` is set to :const:`True`, you may call
:meth:`.ResponseFuture.get_query_trace()` after the request
completes to retrieve a :class:`.QueryTrace` instance.
If `trace` is set to :const:`True`, you may get the query trace descriptors using
:meth:`.ResponseFuture.get_query_trace()` or :meth:`.ResponseFuture.get_all_query_traces()`
on the future result.

`custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
If `query` is a Statement with its own custom_payload. The message payload
Expand Down Expand Up @@ -1730,8 +1710,7 @@ def _create_response_future(self, query, parameters, trace, custom_payload, time
query.batch_type, query._statements_and_parameters, cl,
query.serial_consistency_level, timestamp)

if trace:
message.tracing = True
message.tracing = trace

message.update_custom_payload(query.custom_payload)
message.update_custom_payload(custom_payload)
Expand Down Expand Up @@ -2710,7 +2689,7 @@ class ResponseFuture(object):
_req_id = None
_final_result = _NOT_SET
_final_exception = None
_query_trace = None
_query_traces = None
_callbacks = None
_errbacks = None
_current_host = None
Expand Down Expand Up @@ -2901,9 +2880,9 @@ def _set_result(self, response):

trace_id = getattr(response, 'trace_id', None)
if trace_id:
if self.query:
self.query.trace_id = trace_id
self._query_trace = QueryTrace(trace_id, self.session)
if not self._query_traces:
self._query_traces = []
self._query_traces.append(QueryTrace(trace_id, self.session))

self._warnings = getattr(response, 'warnings', None)
self._custom_payload = getattr(response, 'custom_payload', None)
Expand All @@ -2930,7 +2909,7 @@ def _set_result(self, response):
self, **response.results)
else:
results = getattr(response, 'results', None)
if results is not None and response.kind == RESULT_KIND_ROWS:
if results is not None and response.kind ==RESULT_KIND_ROWS:
self._paging_state = response.paging_state
results = self.row_factory(*results)
self._set_final_result(results)
Expand Down Expand Up @@ -3197,27 +3176,37 @@ def result(self, timeout=_NOT_SET):
if not self._event.is_set():
self._on_timeout()
if self._final_result is not _NOT_SET:
if self._paging_state is None:
return self._final_result
else:
return PagedResult(self, self._final_result)
return ResultSet(self, self._final_result)
else:
raise self._final_exception

def get_query_trace(self, max_wait=None):
"""
Returns the :class:`~.query.QueryTrace` instance representing a trace
of the last attempt for this operation, or :const:`None` if tracing was
not enabled for this query. Note that this may raise an exception if
there are problems retrieving the trace details from Cassandra. If the
trace is not available after `max_wait` seconds,
Fetches and returns the query trace of the last response, or `None` if tracing was
not enabled.

Note that this may raise an exception if there are problems retrieving the trace
details from Cassandra. If the trace is not available after `max_wait_sec`,
:exc:`cassandra.query.TraceUnavailable` will be raised.
"""
if not self._query_trace:
return None
if self._query_traces:
return self._get_query_trace(len(self._query_traces) - 1, max_wait)

self._query_trace.populate(max_wait)
return self._query_trace
def get_all_query_traces(self, max_wait_per=None):
"""
Fetches and returns the query traces for all query pages, if tracing was enabled.

See note in :meth:`~.get_query_trace` regarding possible exceptions.
"""
if self._query_traces:
return [self._get_query_trace(i, max_wait_per) for i in range(len(self._query_traces))]
return []

def _get_query_trace(self, i, max_wait):
trace = self._query_traces[i]
if not trace.events:
trace.populate(max_wait=max_wait)
return trace

def add_callback(self, fn, *args, **kwargs):
"""
Expand Down Expand Up @@ -3333,51 +3322,117 @@ class QueryExhausted(Exception):
pass


class PagedResult(object):
class ResultSet(object):
"""
An iterator over the rows from a paged query result. Whenever the number
of result rows for a query exceed the :attr:`~.query.Statement.fetch_size`
(or :attr:`~.Session.default_fetch_size`, if not set) an instance of this
class will be returned.
An iterator over the rows from a query result. Also supplies basic equality
and indexing methods for backward-compatability. These methods materialize
the entire result set (loading all pages), and should only be used if the
total result size is understood. Warnings are emitted when paged results
are materialized in this fashion.

You can treat this as a normal iterator over rows::

>>> from cassandra.query import SimpleStatement
>>> statement = SimpleStatement("SELECT * FROM users", fetch_size=10)
>>> for user_row in session.execute(statement):
... process_user(user_row)
... process_user(user_rowt

Whenever there are no more rows in the current page, the next page will
be fetched transparently. However, note that it *is* possible for
an :class:`Exception` to be raised while fetching the next page, just
like you might see on a normal call to ``session.execute()``.

.. versionadded: 2.0.0
"""

response_future = None

def __init__(self, response_future, initial_response):
self.response_future = response_future
self.current_response = iter(initial_response)
self._set_current_rows(initial_response)
self._page_iter = None
self._list_mode = False

@property
def has_more_pages(self):
"""
True if the last response indicated more pages; False otherwise
"""
return self.response_future.has_more_pages

@property
def current_rows(self):
return self._current_rows or []

def __iter__(self):
if self._list_mode:
return iter(self._current_rows)
self._page_iter = iter(self._current_rows)
return self

def next(self):
try:
return next(self.current_response)
return next(self._page_iter)
except StopIteration:
if not self.response_future.has_more_pages:
if not self._list_mode:
self._current_rows = []
raise

self.response_future.start_fetching_next_page()
result = self.response_future.result()
self.fetch_next_page()
self._page_iter = iter(self._current_rows)

return next(self._page_iter)

__next__ = next

def fetch_next_page(self):
if self.response_future.has_more_pages:
self.current_response = result.current_response
self.response_future.start_fetching_next_page()
result = self.response_future.result()
self._current_rows = result._current_rows # ResultSet has already _set_current_rows to the appropriate form
else:
self.current_response = iter(result)
self._current_rows = []

return next(self.current_response)
def _set_current_rows(self, result):
if isinstance(result, _RESULT_SEQUENCE_TYPES):
self._current_rows = result
else:
self._current_rows = [result] if result else []

__next__ = next
def _fetch_all(self):
self._current_rows = list(self)
self._page_iter = None

def _enter_list_mode(self, operator):
if self._list_mode:
return
if self._page_iter:
raise RuntimeError("Cannot use %s when results have been iterated." % operator)
if self.response_future.has_more_pages:
log.warning("Using %s on paged results causes entire result set to be materialized.", operator)
self._fetch_all()
self._list_mode = True

def __eq__(self, other):
self._enter_list_mode("equality operator")
return self._current_rows == other

def __getitem__(self, i):
self._enter_list_mode("index operator")
return self._current_rows[i]

def __nonzero__(self):
return bool(self._current_rows)

__bool__ = __nonzero__

def get_query_trace(self, max_wait_sec=None):
"""
Gets the last query trace from the associated future.
See :meth:`.ResponseFuture.get_query_trace` for details.
"""
return self.response_future.get_query_trace(max_wait_sec)

def get_all_query_traces(self, max_wait_sec_per=None):
"""
Gets all query traces from the associated future.
See :meth:`.ResponseFuture.get_all_query_traces` for details.
"""
return self.response_future.get_all_query_traces(max_wait_sec_per)
8 changes: 3 additions & 5 deletions cassandra/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from threading import Condition
import sys

from cassandra.cluster import PagedResult
from cassandra.cluster import ResultSet

import logging
log = logging.getLogger(__name__)
Expand Down Expand Up @@ -134,10 +134,8 @@ def _execute(self, idx, statement, params):
self._put_result(e, idx, False)

def _on_success(self, result, future, idx):
if future.has_more_pages:
result = PagedResult(future, result)
future.clear_callbacks()
self._put_result(result, idx, True)
future.clear_callbacks()
self._put_result(ResultSet(future, result), idx, True)

def _on_error(self, result, future, idx):
self._put_result(result, idx, False)
Expand Down
4 changes: 4 additions & 0 deletions cassandra/obj_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ cdef class LazyParser(ColumnParser):
# supported in cpdef methods
return parse_rows_lazy(reader, desc)

cpdef get_cython_generator_type(self):
return get_cython_generator_type()

def parse_rows_lazy(BytesIOReader reader, ParseDesc desc):
cdef Py_ssize_t i, rowcount
rowcount = read_int(reader)
cdef RowParser rowparser = TupleRowParser()
return (rowparser.unpack_row(reader, desc) for i in range(rowcount))

def get_cython_generator_type():
return type((i for i in range(0)))

cdef class TupleRowParser(RowParser):
"""
Expand Down
4 changes: 4 additions & 0 deletions cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ def decode_message(cls, protocol_version, user_type_map, stream_id, flags, opcod

return msg

_RESULT_SEQUENCE_TYPES = (list, tuple) # types retuned by ResultMessages

def cython_protocol_handler(colparser):
"""
Expand Down Expand Up @@ -1045,6 +1046,9 @@ class CythonProtocolHandler(ProtocolHandler):
if HAVE_CYTHON:
from cassandra.obj_parser import ListParser, LazyParser
ProtocolHandler = cython_protocol_handler(ListParser())

lazy_parser = LazyParser()
_RESULT_SEQUENCE_TYPES += (lazy_parser.get_cython_generator_type(),)
LazyProtocolHandler = cython_protocol_handler(LazyParser())
else:
# Use Python-based ProtocolHandler
Expand Down
12 changes: 0 additions & 12 deletions cassandra/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,6 @@ class Statement(object):
will be retried.
"""

trace = None
"""
If :meth:`.Session.execute()` is run with `trace` set to :const:`True`,
this will be set to a :class:`.QueryTrace` instance.
"""

trace_id = None
"""
If :meth:`.Session.execute()` is run with `trace` set to :const:`True`,
this will be set to the tracing ID from the server.
"""

consistency_level = None
"""
The :class:`.ConsistencyLevel` to be used for this operation. Defaults
Expand Down
4 changes: 3 additions & 1 deletion docs/api/cassandra/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@

.. automethod:: get_query_trace()

.. automethod:: get_all_query_traces()

.. autoattribute:: custom_payload()

.. autoattribute:: has_more_pages
Expand All @@ -140,7 +142,7 @@

.. automethod:: add_callbacks(callback, errback, callback_args=(), callback_kwargs=None, errback_args=(), errback_args=None)

.. autoclass:: PagedResult ()
.. autoclass:: ResultSet ()
:members:

.. autoexception:: QueryExhausted ()
Expand Down
Loading