|
70 | 70 | BatchStatement, bind_params, QueryTrace, TraceUnavailable, |
71 | 71 | named_tuple_factory, dict_factory, tuple_factory, FETCH_SIZE_UNSET) |
72 | 72 | from cassandra.timestamps import MonotonicTimestampGenerator |
| 73 | +from cassandra.context import DriverContext |
73 | 74 |
|
74 | 75 |
|
75 | 76 | def _is_eventlet_monkey_patched(): |
@@ -651,6 +652,8 @@ def _default_load_balancing_policy(self): |
651 | 652 | documentation for :meth:`Session.timestamp_generator`. |
652 | 653 | """ |
653 | 654 |
|
| 655 | + _context = None |
| 656 | + |
654 | 657 | @property |
655 | 658 | def schema_metadata_enabled(self): |
656 | 659 | """ |
@@ -734,13 +737,16 @@ def __init__(self, |
734 | 737 | allow_beta_protocol_version=False, |
735 | 738 | timestamp_generator=None, |
736 | 739 | idle_heartbeat_timeout=30, |
737 | | - no_compact=False): |
| 740 | + no_compact=False, |
| 741 | + context=None): |
738 | 742 | """ |
739 | 743 | ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as |
740 | 744 | extablishing connection pools or refreshing metadata. |
741 | 745 |
|
742 | 746 | Any of the mutable Cluster attributes may be set as keyword arguments to the constructor. |
743 | 747 | """ |
| 748 | + self._context = context or DriverContext() |
| 749 | + |
744 | 750 | if contact_points is not None: |
745 | 751 | if contact_points is _NOT_SET: |
746 | 752 | self._contact_points_explicit = False |
@@ -964,11 +970,14 @@ def connection_factory(self, address, *args, **kwargs): |
964 | 970 | Intended for internal use only. |
965 | 971 | """ |
966 | 972 | kwargs = self._make_connection_kwargs(address, kwargs) |
967 | | - return self.connection_class.factory(address, self.connect_timeout, *args, **kwargs) |
| 973 | + return self.connection_class.factory( |
| 974 | + self._context.protocol_handler, address, self.connect_timeout, |
| 975 | + *args, **kwargs) |
968 | 976 |
|
969 | 977 | def _make_connection_factory(self, host, *args, **kwargs): |
970 | 978 | kwargs = self._make_connection_kwargs(host.address, kwargs) |
971 | | - return partial(self.connection_class.factory, host.address, self.connect_timeout, *args, **kwargs) |
| 979 | + return partial(self.connection_class.factory, self._context.protocol_handler, host.address, |
| 980 | + self.connect_timeout, *args, **kwargs) |
972 | 981 |
|
973 | 982 | def _make_connection_kwargs(self, address, kwargs_dict): |
974 | 983 | if self._auth_provider_callable: |
@@ -1098,7 +1107,7 @@ def __exit__(self, *args): |
1098 | 1107 | self.shutdown() |
1099 | 1108 |
|
1100 | 1109 | def _new_session(self, keyspace): |
1101 | | - session = Session(self, self.metadata.all_hosts(), keyspace) |
| 1110 | + session = Session(self, self.metadata.all_hosts(), keyspace, context=self._context) |
1102 | 1111 | self._session_register_user_types(session) |
1103 | 1112 | self.sessions.add(session) |
1104 | 1113 | return session |
@@ -1710,24 +1719,18 @@ class Session(object): |
1710 | 1719 | .. versionadded:: 2.1.0 |
1711 | 1720 | """ |
1712 | 1721 |
|
1713 | | - client_protocol_handler = ProtocolHandler |
1714 | | - """ |
1715 | | - Specifies a protocol handler that will be used for client-initiated requests (i.e. no |
1716 | | - internal driver requests). This can be used to override or extend features such as |
1717 | | - message or type ser/des. |
1718 | | -
|
1719 | | - The default pure python implementation is :class:`cassandra.protocol.ProtocolHandler`. |
1720 | | -
|
1721 | | - When compiled with Cython, there are also built-in faster alternatives. See :ref:`faster_deser` |
1722 | | - """ |
1723 | | - |
| 1722 | + _protocol_handler_class = ProtocolHandler |
| 1723 | + _protocol_handler = None |
1724 | 1724 | _lock = None |
1725 | 1725 | _pools = None |
1726 | 1726 | _profile_manager = None |
1727 | 1727 | _metrics = None |
1728 | 1728 | _request_init_callbacks = None |
| 1729 | + _context = None |
| 1730 | + |
| 1731 | + def __init__(self, cluster, hosts, keyspace=None, context=None): |
| 1732 | + self._context = context or DriverContext() |
1729 | 1733 |
|
1730 | | - def __init__(self, cluster, hosts, keyspace=None): |
1731 | 1734 | self.cluster = cluster |
1732 | 1735 | self.hosts = hosts |
1733 | 1736 | self.keyspace = keyspace |
@@ -1757,6 +1760,26 @@ def __init__(self, cluster, hosts, keyspace=None): |
1757 | 1760 | msg += " using keyspace '%s'" % self.keyspace |
1758 | 1761 | raise NoHostAvailable(msg, [h.address for h in hosts]) |
1759 | 1762 |
|
| 1763 | + @property |
| 1764 | + def protocol_handler_class(self): |
| 1765 | + """ |
| 1766 | + Specifies a protocol handler that will be used for client-initiated requests (i.e. no |
| 1767 | + internal driver requests). This can be used to override or extend features such as |
| 1768 | + message or type ser/des. |
| 1769 | +
|
| 1770 | + The default pure python implementation is :class:`cassandra.protocol.ProtocolHandler`. |
| 1771 | +
|
| 1772 | + When compiled with Cython, there are also built-in faster alternatives. See :ref:`faster_deser` |
| 1773 | + """ |
| 1774 | + return self._protocol_handler_class |
| 1775 | + |
| 1776 | + @protocol_handler_class.setter |
| 1777 | + def protocol_handler_class(self, value): |
| 1778 | + self._protocol_handler_class = value |
| 1779 | + self._protocol_handler = self._protocol_handler_class( |
| 1780 | + self._context.message_codec_registry.encoders, |
| 1781 | + self._context.message_codec_registry.decoders) |
| 1782 | + |
1760 | 1783 | def execute(self, query, parameters=None, timeout=_NOT_SET, trace=False, custom_payload=None, execution_profile=EXEC_PROFILE_DEFAULT, paging_state=None): |
1761 | 1784 | """ |
1762 | 1785 | Execute the given query and synchronously wait for the response. |
@@ -1829,7 +1852,6 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None |
1829 | 1852 |
|
1830 | 1853 | """ |
1831 | 1854 | future = self._create_response_future(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state) |
1832 | | - future._protocol_handler = self.client_protocol_handler |
1833 | 1855 | self._on_request(future) |
1834 | 1856 | future.send_request() |
1835 | 1857 | return future |
@@ -3074,13 +3096,14 @@ class ResponseFuture(object): |
3074 | 3096 | _custom_payload = None |
3075 | 3097 | _warnings = None |
3076 | 3098 | _timer = None |
3077 | | - _protocol_handler = ProtocolHandler |
| 3099 | + |
3078 | 3100 | _spec_execution_plan = NoSpeculativeExecutionPlan() |
3079 | 3101 |
|
3080 | 3102 | _warned_timeout = False |
3081 | 3103 |
|
3082 | 3104 | def __init__(self, session, message, query, timeout, metrics=None, prepared_statement=None, |
3083 | | - retry_policy=RetryPolicy(), row_factory=None, load_balancer=None, start_time=None, speculative_execution_plan=None): |
| 3105 | + retry_policy=RetryPolicy(), row_factory=None, load_balancer=None, start_time=None, |
| 3106 | + speculative_execution_plan=None): |
3084 | 3107 | self.session = session |
3085 | 3108 | # TODO: normalize handling of retry policy and row factory |
3086 | 3109 | self.row_factory = row_factory or session.cluster._default_row_factory |
@@ -3237,10 +3260,9 @@ def _query(self, host, message=None, cb=None): |
3237 | 3260 | if cb is None: |
3238 | 3261 | cb = partial(self._set_result, host, connection, pool) |
3239 | 3262 |
|
3240 | | - self.request_encoded_size = connection.send_msg(message, request_id, cb=cb, |
3241 | | - encoder=self._protocol_handler.encode_message, |
3242 | | - decoder=self._protocol_handler.decode_message, |
3243 | | - result_metadata=result_meta) |
| 3263 | + self.request_encoded_size = connection.send_msg( |
| 3264 | + message, request_id, cb=cb, result_metadata=result_meta, |
| 3265 | + protocol_handler=self.session._protocol_handler) |
3244 | 3266 | self.attempted_hosts.append(host) |
3245 | 3267 | return request_id |
3246 | 3268 | except NoConnectionsAvailable as exc: |
|
0 commit comments