Skip to content

Commit d4bf402

Browse files
committed
Cluster config to disable schema and token processing
PYTHON-327
1 parent 2eec974 commit d4bf402

3 files changed

Lines changed: 100 additions & 42 deletions

File tree

cassandra/cluster.py

Lines changed: 90 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,37 @@ def auth_provider(self, value):
481481
establishment, options passing, and authentication.
482482
"""
483483

484+
@property
485+
def schema_metadata_enabled(self):
486+
"""
487+
Flag indicating whether internal schema metadata is updated.
488+
489+
When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This
490+
can be used to speed initial connection, and reduce load on client and server during operation. Turning this off
491+
gives away token aware request routing, and programmatic inspection of the metadata model.
492+
"""
493+
return self.control_connection._schema_meta_enabled
494+
495+
@schema_metadata_enabled.setter
496+
def schema_metadata_enabled(self, enabled):
497+
self.control_connection._schema_meta_enabled = bool(enabled)
498+
499+
@property
500+
def token_metadata_enabled(self):
501+
"""
502+
Flag indicating whether internal token metadata is updated.
503+
504+
When disabled, the driver does not query node token information on connect, or on topology change events. This
505+
can be used to speed initial connection, and reduce load on client and server during operation. It is most useful
506+
in large clusters using vnodes, where the token map can be expensive to compute. Turning this off
507+
gives away token aware request routing, and programmatic inspection of the token ring.
508+
"""
509+
return self.control_connection._token_meta_enabled
510+
511+
@token_metadata_enabled.setter
512+
def token_metadata_enabled(self, enabled):
513+
self.control_connection._token_meta_enabled = bool(enabled)
514+
484515
sessions = None
485516
control_connection = None
486517
scheduler = None
@@ -520,7 +551,9 @@ def __init__(self,
520551
idle_heartbeat_interval=30,
521552
schema_event_refresh_window=2,
522553
topology_event_refresh_window=10,
523-
connect_timeout=5):
554+
connect_timeout=5,
555+
schema_metadata_enabled=True,
556+
token_metadata_enabled=True):
524557
"""
525558
Any of the mutable Cluster attributes may be set as keyword arguments
526559
to the constructor.
@@ -619,7 +652,9 @@ def __init__(self,
619652

620653
self.control_connection = ControlConnection(
621654
self, self.control_connection_timeout,
622-
self.schema_event_refresh_window, self.topology_event_refresh_window)
655+
self.schema_event_refresh_window, self.topology_event_refresh_window,
656+
schema_metadata_enabled, token_metadata_enabled)
657+
623658

624659
def register_user_type(self, keyspace, user_type, klass):
625660
"""
@@ -1244,7 +1279,7 @@ def refresh_schema_metadata(self, max_schema_agreement_wait=None):
12441279
12451280
An Exception is raised if schema refresh fails for any reason.
12461281
"""
1247-
if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait):
1282+
if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait, force=True):
12481283
raise Exception("Schema metadata was not refreshed. See log for details.")
12491284

12501285
def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None):
@@ -1255,7 +1290,7 @@ def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None):
12551290
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12561291
"""
12571292
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.KEYSPACE, keyspace=keyspace,
1258-
schema_agreement_wait=max_schema_agreement_wait):
1293+
schema_agreement_wait=max_schema_agreement_wait, force=True):
12591294
raise Exception("Keyspace metadata was not refreshed. See log for details.")
12601295

12611296
def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None):
@@ -1265,7 +1300,8 @@ def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None
12651300
12661301
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12671302
"""
1268-
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table, schema_agreement_wait=max_schema_agreement_wait):
1303+
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table,
1304+
schema_agreement_wait=max_schema_agreement_wait, force=True):
12691305
raise Exception("Table metadata was not refreshed. See log for details.")
12701306

12711307
def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreement_wait=None):
@@ -1274,7 +1310,8 @@ def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreemen
12741310
12751311
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12761312
"""
1277-
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view, schema_agreement_wait=max_schema_agreement_wait):
1313+
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view,
1314+
schema_agreement_wait=max_schema_agreement_wait, force=True):
12781315
raise Exception("View metadata was not refreshed. See log for details.")
12791316

12801317
def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_wait=None):
@@ -1283,7 +1320,8 @@ def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_w
12831320
12841321
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12851322
"""
1286-
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type, schema_agreement_wait=max_schema_agreement_wait):
1323+
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type,
1324+
schema_agreement_wait=max_schema_agreement_wait, force=True):
12871325
raise Exception("User Type metadata was not refreshed. See log for details.")
12881326

12891327
def refresh_user_function_metadata(self, keyspace, function, max_schema_agreement_wait=None):
@@ -1294,7 +1332,8 @@ def refresh_user_function_metadata(self, keyspace, function, max_schema_agreemen
12941332
12951333
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12961334
"""
1297-
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function, schema_agreement_wait=max_schema_agreement_wait):
1335+
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function,
1336+
schema_agreement_wait=max_schema_agreement_wait, force=True):
12981337
raise Exception("User Function metadata was not refreshed. See log for details.")
12991338

13001339
def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreement_wait=None):
@@ -1305,7 +1344,8 @@ def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreem
13051344
13061345
See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
13071346
"""
1308-
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate, schema_agreement_wait=max_schema_agreement_wait):
1347+
if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate,
1348+
schema_agreement_wait=max_schema_agreement_wait, force=True):
13091349
raise Exception("User Aggregate metadata was not refreshed. See log for details.")
13101350

13111351
def refresh_nodes(self):
@@ -1319,6 +1359,8 @@ def refresh_nodes(self):
13191359

13201360
def set_meta_refresh_enabled(self, enabled):
13211361
"""
1362+
*Deprecated:* set :attr:`~.Cluster.schema_metadata_enabled` :attr:`~.Cluster.token_metadata_enabled` instead
1363+
13221364
Sets a flag to enable (True) or disable (False) all metadata refresh queries.
13231365
This applies to both schema and node topology.
13241366
@@ -1327,7 +1369,8 @@ def set_meta_refresh_enabled(self, enabled):
13271369
Meta refresh must be enabled for the driver to become aware of any cluster
13281370
topology changes or schema updates.
13291371
"""
1330-
self.control_connection.set_meta_refresh_enabled(bool(enabled))
1372+
self.schema_metadata_enabled = enabled
1373+
self.token_metadata_enabled = enabled
13311374

13321375
def _prepare_all_queries(self, host):
13331376
if not self._prepared_statements:
@@ -2010,7 +2053,10 @@ class ControlConnection(object):
20102053
"""
20112054

20122055
_SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers"
2056+
_SELECT_PEERS_NO_TOKENS = "SELECT peer, data_center, rack, rpc_address, schema_version FROM system.peers"
20132057
_SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
2058+
_SELECT_LOCAL_NO_TOKENS = "SELECT cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
2059+
20142060

20152061
_SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers"
20162062
_SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
@@ -2022,14 +2068,17 @@ class ControlConnection(object):
20222068
_schema_event_refresh_window = None
20232069
_topology_event_refresh_window = None
20242070

2025-
_meta_refresh_enabled = True
2071+
_schema_meta_enabled = True
2072+
_token_meta_enabled = True
20262073

20272074
# for testing purposes
20282075
_time = time
20292076

20302077
def __init__(self, cluster, timeout,
20312078
schema_event_refresh_window,
2032-
topology_event_refresh_window):
2079+
topology_event_refresh_window,
2080+
schema_meta_enabled=True,
2081+
token_meta_enabled=True):
20332082
# use a weak reference to allow the Cluster instance to be GC'ed (and
20342083
# shutdown) since implementing __del__ disables the cycle detector
20352084
self._cluster = weakref.proxy(cluster)
@@ -2038,6 +2087,8 @@ def __init__(self, cluster, timeout,
20382087

20392088
self._schema_event_refresh_window = schema_event_refresh_window
20402089
self._topology_event_refresh_window = topology_event_refresh_window
2090+
self._schema_meta_enabled = schema_meta_enabled
2091+
self._token_meta_enabled = token_meta_enabled
20412092

20422093
self._lock = RLock()
20432094
self._schema_agreement_lock = Lock()
@@ -2119,8 +2170,10 @@ def _try_connect(self, host):
21192170
"SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
21202171
}, register_timeout=self._timeout)
21212172

2122-
peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=ConsistencyLevel.ONE)
2123-
local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=ConsistencyLevel.ONE)
2173+
sel_peers = self._SELECT_PEERS if self._token_meta_enabled else self._SELECT_PEERS_NO_TOKENS
2174+
sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
2175+
peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
2176+
local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE)
21242177
shared_results = connection.wait_for_responses(
21252178
peers_query, local_query, timeout=self._timeout)
21262179

@@ -2200,28 +2253,29 @@ def shutdown(self):
22002253
self._connection.close()
22012254
del self._connection
22022255

2203-
def refresh_schema(self, **kwargs):
2204-
if not self._meta_refresh_enabled:
2205-
log.debug("[control connection] Skipping schema refresh because meta refresh is disabled")
2206-
return False
2207-
2256+
def refresh_schema(self, force=False, **kwargs):
22082257
try:
22092258
if self._connection:
2210-
return self._refresh_schema(self._connection, **kwargs)
2259+
return self._refresh_schema(self._connection, force=force, **kwargs)
22112260
except ReferenceError:
22122261
pass # our weak reference to the Cluster is no good
22132262
except Exception:
22142263
log.debug("[control connection] Error refreshing schema", exc_info=True)
22152264
self._signal_error()
22162265
return False
22172266

2218-
def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, **kwargs):
2267+
def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, force=False, **kwargs):
22192268
if self._cluster.is_shutdown:
22202269
return False
22212270

22222271
agreed = self.wait_for_schema_agreement(connection,
22232272
preloaded_results=preloaded_results,
22242273
wait_time=schema_agreement_wait)
2274+
2275+
if not self._schema_meta_enabled and not force:
2276+
log.debug("[control connection] Skipping schema refresh because schema metadata is disabled")
2277+
return False
2278+
22252279
if not agreed:
22262280
log.debug("Skipping schema refresh due to lack of schema agreement")
22272281
return False
@@ -2231,10 +2285,6 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
22312285
return True
22322286

22332287
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
2234-
if not self._meta_refresh_enabled:
2235-
log.debug("[control connection] Skipping node list refresh because meta refresh is disabled")
2236-
return False
2237-
22382288
try:
22392289
if self._connection:
22402290
self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild)
@@ -2254,10 +2304,17 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
22542304
peers_result = preloaded_results[0]
22552305
local_result = preloaded_results[1]
22562306
else:
2257-
log.debug("[control connection] Refreshing node list and token map")
22582307
cl = ConsistencyLevel.ONE
2259-
peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=cl)
2260-
local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=cl)
2308+
if not self._token_meta_enabled:
2309+
log.debug("[control connection] Refreshing node list without token map")
2310+
sel_peers = self._SELECT_PEERS_NO_TOKENS
2311+
sel_local = self._SELECT_LOCAL_NO_TOKENS
2312+
else:
2313+
log.debug("[control connection] Refreshing node list and token map")
2314+
sel_peers = self._SELECT_PEERS
2315+
sel_local = self._SELECT_LOCAL
2316+
peers_query = QueryMessage(query=sel_peers, consistency_level=cl)
2317+
local_query = QueryMessage(query=sel_local, consistency_level=cl)
22612318
peers_result, local_result = connection.wait_for_responses(
22622319
peers_query, local_query, timeout=self._timeout)
22632320

@@ -2296,8 +2353,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
22962353
if not addr or addr in ["0.0.0.0", "::"]:
22972354
addr = row.get("peer")
22982355

2299-
tokens = row.get("tokens")
2300-
if not tokens:
2356+
tokens = row.get("tokens", None)
2357+
if 'tokens' in row and not tokens: # it was selected, but empty
23012358
log.warning("Excluding host (%s) with no tokens in system.peers table of %s." % (addr, connection.host))
23022359
continue
23032360

@@ -2529,9 +2586,6 @@ def return_connection(self, connection):
25292586
if connection is self._connection and (connection.is_defunct or connection.is_closed):
25302587
self.reconnect()
25312588

2532-
def set_meta_refresh_enabled(self, enabled):
2533-
self._meta_refresh_enabled = enabled
2534-
25352589

25362590
def _stop_scheduler(scheduler, thread):
25372591
try:
@@ -2626,13 +2680,9 @@ def _log_if_failed(self, future):
26262680

26272681
def refresh_schema_and_set_result(control_conn, response_future, **kwargs):
26282682
try:
2629-
if control_conn._meta_refresh_enabled:
2630-
log.debug("Refreshing schema in response to schema change. "
2631-
"%s", kwargs)
2632-
response_future.is_schema_agreed = control_conn._refresh_schema(response_future._connection, **kwargs)
2633-
else:
2634-
log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; "
2635-
"%s", kwargs)
2683+
log.debug("Refreshing schema in response to schema change. "
2684+
"%s", kwargs)
2685+
response_future.is_schema_agreed = control_conn._refresh_schema(response_future._connection, **kwargs)
26362686
except Exception:
26372687
log.exception("Exception refreshing schema in response to schema change:")
26382688
response_future.session.submit(control_conn.refresh_schema, **kwargs)

cassandra/metadata.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,8 +1402,10 @@ def rebuild_keyspace(self, keyspace, build_if_absent=False):
14021402
with self._rebuild_lock:
14031403
current = self.tokens_to_hosts_by_ks.get(keyspace, None)
14041404
if (build_if_absent and current is None) or (not build_if_absent and current is not None):
1405-
replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
1406-
self.tokens_to_hosts_by_ks[keyspace] = replica_map
1405+
ks_meta = self._metadata.keyspaces.get(keyspace)
1406+
if ks_meta:
1407+
replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace])
1408+
self.tokens_to_hosts_by_ks[keyspace] = replica_map
14071409

14081410
def replica_map_for_keyspace(self, ks_metadata):
14091411
strategy = ks_metadata.replication_strategy

docs/api/cassandra/cluster.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@
4949

5050
.. autoattribute:: connect_timeout
5151

52+
.. autoattribute:: schema_metadata_enabled
53+
:annotation: = True
54+
55+
.. autoattribute:: token_metadata_enabled
56+
:annotation: = True
57+
5258
.. automethod:: connect
5359

5460
.. automethod:: shutdown

0 commit comments

Comments
 (0)