Skip to content

Commit fe3cf63

Browse files
authored
Merge pull request apache#646 from datastax/284
PYTHON-284 - request size attribute, request init listener
2 parents 67a9367 + 5207686 commit fe3cf63

5 files changed

Lines changed: 166 additions & 4 deletions

File tree

cassandra/cluster.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1916,6 +1916,7 @@ def default_serial_consistency_level(self, cl):
19161916
_pools = None
19171917
_profile_manager = None
19181918
_metrics = None
1919+
_request_init_callbacks = None
19191920

19201921
def __init__(self, cluster, hosts, keyspace=None):
19211922
self.cluster = cluster
@@ -1926,6 +1927,7 @@ def __init__(self, cluster, hosts, keyspace=None):
19261927
self._pools = {}
19271928
self._profile_manager = cluster.profile_manager
19281929
self._metrics = cluster.metrics
1930+
self._request_init_callbacks = []
19291931
self._protocol_version = self.cluster.protocol_version
19301932

19311933
self.encoder = Encoder()
@@ -2018,6 +2020,7 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
20182020
"""
20192021
future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile)
20202022
future._protocol_handler = self.client_protocol_handler
2023+
self._on_request(future)
20212024
future.send_request()
20222025
return future
20232026

@@ -2123,6 +2126,38 @@ def execution_profile_clone_update(self, ep, **kwargs):
21232126
setattr(clone, attr, value)
21242127
return clone
21252128

2129+
def add_request_init_listener(self, fn, *args, **kwargs):
2130+
"""
2131+
Adds a callback with arguments to be called when any request is created.
2132+
2133+
It will be invoked as `fn(response_future, *args, **kwargs)` after each client request is created,
2134+
and before the request is sent\*. This can be used to create extensions by adding result callbacks to the
2135+
response future.
2136+
2137+
\* where `response_future` is the :class:`.ResponseFuture` for the request.
2138+
2139+
Note that the init callback is done on the client thread creating the request, so you may need to consider
2140+
synchronization if you have multiple threads. Any callbacks added to the response future will be executed
2141+
on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in
2142+
:meth:`.ResponseFuture.add_callbacks`.
2143+
2144+
See `this example <https://github.com/datastax/python-driver/blob/master/examples/request_init_listener.py>`_ in the
2145+
source tree for an example.
2146+
"""
2147+
self._request_init_callbacks.append((fn, args, kwargs))
2148+
2149+
def remove_request_init_listener(self, fn, *args, **kwargs):
2150+
"""
2151+
Removes a callback and arguments from the list.
2152+
2153+
See :meth:`.Session.add_request_init_listener`.
2154+
"""
2155+
self._request_init_callbacks.remove((fn, args, kwargs))
2156+
2157+
def _on_request(self, response_future):
2158+
for fn, args, kwargs in self._request_init_callbacks:
2159+
fn(response_future, *args, **kwargs)
2160+
21262161
def prepare(self, query, custom_payload=None):
21272162
"""
21282163
Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
@@ -3162,6 +3197,11 @@ class ResponseFuture(object):
31623197
Always ``True`` for non-DDL requests.
31633198
"""
31643199

3200+
request_encoded_size = None
3201+
"""
3202+
Size of the request message sent
3203+
"""
3204+
31653205
session = None
31663206
row_factory = None
31673207
message = None
@@ -3285,8 +3325,10 @@ def _query(self, host, message=None, cb=None):
32853325
connection, request_id = pool.borrow_connection(timeout=2.0)
32863326
self._connection = connection
32873327
result_meta = self.prepared_statement.result_metadata if self.prepared_statement else []
3288-
connection.send_msg(message, request_id, cb=cb, encoder=self._protocol_handler.encode_message, decoder=self._protocol_handler.decode_message,
3289-
result_metadata=result_meta)
3328+
self.request_encoded_size = connection.send_msg(message, request_id, cb=cb,
3329+
encoder=self._protocol_handler.encode_message,
3330+
decoder=self._protocol_handler.decode_message,
3331+
result_metadata=result_meta)
32903332
return request_id
32913333
except NoConnectionsAvailable as exc:
32923334
log.debug("All connections for host %s are at capacity, moving to the next host", host)

cassandra/connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,8 +458,9 @@ def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message,
458458
# queue the decoder function with the request
459459
# this allows us to inject custom functions per request to encode, decode messages
460460
self._requests[request_id] = (cb, decoder, result_metadata)
461-
self.push(encoder(msg, request_id, self.protocol_version, compressor=self.compressor, allow_beta_protocol_version=self.allow_beta_protocol_version))
462-
return request_id
461+
msg = encoder(msg, request_id, self.protocol_version, compressor=self.compressor, allow_beta_protocol_version=self.allow_beta_protocol_version)
462+
self.push(msg)
463+
return len(msg)
463464

464465
def wait_for_response(self, msg, timeout=None):
465466
return self.wait_for_responses(msg, timeout=timeout)[0]

docs/api/cassandra/cluster.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@
150150

151151
.. automethod:: execution_profile_clone_update
152152

153+
.. automethod:: add_request_init_listener
154+
155+
.. automethod:: remove_request_init_listener
156+
153157
.. autoclass:: ResponseFuture ()
154158

155159
.. autoattribute:: query

examples/README.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
Driver Examples
2+
===============
3+
This directory will contain a set of scripts demonstrating driver APIs or integration techniques. It will not be exhaustive, but will contain examples where they are too involved, or
4+
open-ended to include inline in the docstrings. In that case, they should be referenced from the docstrings
5+
6+
Features
7+
--------
8+
* `request_init_listener.py <request_init_listener.py>`_ A script demonstrating how to register a session request listener and use it to track alternative metrics about requests (size, for example).

examples/request_init_listener.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#!/usr/bin/env python
2+
# Copyright 2013-2016 DataStax, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# This script shows an example "request init listener" which can be registered to track certain request metrics
17+
# for a session. In this case we're just accumulating total request and error counts, as well as some statistics
18+
# about the encoded request size. Note that the counts would be available using the internal 'metrics' tracking --
19+
# this is just demonstrating a way to track a few custom attributes.
20+
21+
from __future__ import print_function
22+
from cassandra.cluster import Cluster
23+
from greplin import scales
24+
25+
import pprint
26+
pp = pprint.PrettyPrinter(indent=2)
27+
28+
29+
class RequestAnalyzer(object):
30+
"""
31+
Class used to track request and error counts for a Session.
32+
33+
Also computes statistics on encoded request size.
34+
"""
35+
36+
requests = scales.PmfStat('request size')
37+
errors = scales.IntStat('errors')
38+
39+
def __init__(self, session):
40+
scales.init(self, '/cassandra')
41+
# each instance will be registered with a session, and receive a callback for each request generated
42+
session.add_request_init_listener(self.on_request)
43+
44+
def on_request(self, rf):
45+
# This callback is invoked each time a request is created, on the thread creating the request.
46+
# We can use this to count events, or add callbacks
47+
rf.add_callbacks(self.on_success, self.on_error, callback_args=(rf,), errback_args=(rf,))
48+
49+
def on_success(self, _, response_future):
50+
# future callback on a successful request; just record the size
51+
self.requests.addValue(response_future.request_encoded_size)
52+
53+
def on_error(self, _, response_future):
54+
# future callback for failed; record size and increment errors
55+
self.requests.addValue(response_future.request_encoded_size)
56+
self.errors += 1
57+
58+
def __str__(self):
59+
# just extracting request count from the size stats (which are recorded on all requests)
60+
request_sizes = dict(self.requests)
61+
count = request_sizes.pop('count')
62+
return "%d requests (%d errors)\nRequest size statistics:\n%s" % (count, self.errors, pp.pformat(request_sizes))
63+
64+
65+
# connect a session
66+
session = Cluster().connect()
67+
68+
# attach a listener to this session
69+
ra = RequestAnalyzer(session)
70+
71+
session.execute("SELECT release_version FROM system.local")
72+
session.execute("SELECT release_version FROM system.local")
73+
74+
print(ra)
75+
# 2 requests (0 errors)
76+
# Request size statistics:
77+
# { '75percentile': 74,
78+
# '95percentile': 74,
79+
# '98percentile': 74,
80+
# '999percentile': 74,
81+
# '99percentile': 74,
82+
# 'max': 74,
83+
# 'mean': 74.0,
84+
# 'median': 74.0,
85+
# 'min': 74,
86+
# 'stddev': 0.0}
87+
88+
try:
89+
# intentional error to show that count increase
90+
session.execute("syntax err")
91+
except Exception as e:
92+
pass
93+
94+
print()
95+
print(ra) # note: the counts are updated, but the stats are not because scales only updates every 20s
96+
# 3 requests (1 errors)
97+
# Request size statistics:
98+
# { '75percentile': 74,
99+
# '95percentile': 74,
100+
# '98percentile': 74,
101+
# '999percentile': 74,
102+
# '99percentile': 74,
103+
# 'max': 74,
104+
# 'mean': 74.0,
105+
# 'median': 74.0,
106+
# 'min': 74,
107+
# 'stddev': 0.0}

0 commit comments

Comments
 (0)