@@ -1916,6 +1916,7 @@ def default_serial_consistency_level(self, cl):
19161916 _pools = None
19171917 _profile_manager = None
19181918 _metrics = None
1919+ _request_init_callbacks = None
19191920
19201921 def __init__ (self , cluster , hosts , keyspace = None ):
19211922 self .cluster = cluster
@@ -1926,6 +1927,7 @@ def __init__(self, cluster, hosts, keyspace=None):
19261927 self ._pools = {}
19271928 self ._profile_manager = cluster .profile_manager
19281929 self ._metrics = cluster .metrics
1930+ self ._request_init_callbacks = []
19291931 self ._protocol_version = self .cluster .protocol_version
19301932
19311933 self .encoder = Encoder ()
@@ -2018,6 +2020,7 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None
20182020 """
20192021 future = self ._create_response_future (query , parameters , trace , custom_payload , timeout , execution_profile )
20202022 future ._protocol_handler = self .client_protocol_handler
2023+ self ._on_request (future )
20212024 future .send_request ()
20222025 return future
20232026
@@ -2123,6 +2126,38 @@ def execution_profile_clone_update(self, ep, **kwargs):
21232126 setattr (clone , attr , value )
21242127 return clone
21252128
2129+ def add_request_init_listener (self , fn , * args , ** kwargs ):
2130+ """
2131+ Adds a callback with arguments to be called when any request is created.
2132+
2133+ It will be invoked as `fn(response_future, *args, **kwargs)` after each client request is created,
2134+ and before the request is sent\*. This can be used to create extensions by adding result callbacks to the
2135+ response future.
2136+
2137+ \* where `response_future` is the :class:`.ResponseFuture` for the request.
2138+
2139+ Note that the init callback is done on the client thread creating the request, so you may need to consider
2140+ synchronization if you have multiple threads. Any callbacks added to the response future will be executed
2141+ on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in
2142+ :meth:`.ResponseFuture.add_callbacks`.
2143+
2144+ See `this example <https://github.com/datastax/python-driver/blob/master/examples/request_init_listener.py>`_ in the
2145+ source tree for an example.
2146+ """
2147+ self ._request_init_callbacks .append ((fn , args , kwargs ))
2148+
2149+ def remove_request_init_listener (self , fn , * args , ** kwargs ):
2150+ """
2151+ Removes a callback and arguments from the list.
2152+
2153+ See :meth:`.Session.add_request_init_listener`.
2154+ """
2155+ self ._request_init_callbacks .remove ((fn , args , kwargs ))
2156+
2157+ def _on_request (self , response_future ):
2158+ for fn , args , kwargs in self ._request_init_callbacks :
2159+ fn (response_future , * args , ** kwargs )
2160+
21262161 def prepare (self , query , custom_payload = None ):
21272162 """
21282163 Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
@@ -3162,6 +3197,11 @@ class ResponseFuture(object):
31623197 Always ``True`` for non-DDL requests.
31633198 """
31643199
3200+ request_encoded_size = None
3201+ """
3202+ Size of the request message sent
3203+ """
3204+
31653205 session = None
31663206 row_factory = None
31673207 message = None
@@ -3285,8 +3325,10 @@ def _query(self, host, message=None, cb=None):
32853325 connection , request_id = pool .borrow_connection (timeout = 2.0 )
32863326 self ._connection = connection
32873327 result_meta = self .prepared_statement .result_metadata if self .prepared_statement else []
3288- connection .send_msg (message , request_id , cb = cb , encoder = self ._protocol_handler .encode_message , decoder = self ._protocol_handler .decode_message ,
3289- result_metadata = result_meta )
3328+ self .request_encoded_size = connection .send_msg (message , request_id , cb = cb ,
3329+ encoder = self ._protocol_handler .encode_message ,
3330+ decoder = self ._protocol_handler .decode_message ,
3331+ result_metadata = result_meta )
32903332 return request_id
32913333 except NoConnectionsAvailable as exc :
32923334 log .debug ("All connections for host %s are at capacity, moving to the next host" , host )
0 commit comments