@@ -481,6 +481,37 @@ def auth_provider(self, value):
481481 establishment, options passing, and authentication.
482482 """
483483
484+ @property
485+ def schema_metadata_enabled (self ):
486+ """
487+ Flag indicating whether internal schema metadata is updated.
488+
489+ When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This
490+ can be used to speed initial connection, and reduce load on client and server during operation. Turning this off
491+ gives away token aware request routing, and programmatic inspection of the metadata model.
492+ """
493+ return self .control_connection ._schema_meta_enabled
494+
495+ @schema_metadata_enabled .setter
496+ def schema_metadata_enabled (self , enabled ):
497+ self .control_connection ._schema_meta_enabled = bool (enabled )
498+
499+ @property
500+ def token_metadata_enabled (self ):
501+ """
502+ Flag indicating whether internal token metadata is updated.
503+
504+ When disabled, the driver does not query node token information on connect, or on topology change events. This
505+ can be used to speed initial connection, and reduce load on client and server during operation. It is most useful
506+ in large clusters using vnodes, where the token map can be expensive to compute. Turning this off
507+ gives away token aware request routing, and programmatic inspection of the token ring.
508+ """
509+ return self .control_connection ._token_meta_enabled
510+
511+ @token_metadata_enabled .setter
512+ def token_metadata_enabled (self , enabled ):
513+ self .control_connection ._token_meta_enabled = bool (enabled )
514+
484515 sessions = None
485516 control_connection = None
486517 scheduler = None
@@ -520,7 +551,9 @@ def __init__(self,
520551 idle_heartbeat_interval = 30 ,
521552 schema_event_refresh_window = 2 ,
522553 topology_event_refresh_window = 10 ,
523- connect_timeout = 5 ):
554+ connect_timeout = 5 ,
555+ schema_metadata_enabled = True ,
556+ token_metadata_enabled = True ):
524557 """
525558 Any of the mutable Cluster attributes may be set as keyword arguments
526559 to the constructor.
@@ -619,7 +652,9 @@ def __init__(self,
619652
620653 self .control_connection = ControlConnection (
621654 self , self .control_connection_timeout ,
622- self .schema_event_refresh_window , self .topology_event_refresh_window )
655+ self .schema_event_refresh_window , self .topology_event_refresh_window ,
656+ schema_metadata_enabled , token_metadata_enabled )
657+
623658
624659 def register_user_type (self , keyspace , user_type , klass ):
625660 """
@@ -1244,7 +1279,7 @@ def refresh_schema_metadata(self, max_schema_agreement_wait=None):
12441279
12451280 An Exception is raised if schema refresh fails for any reason.
12461281 """
1247- if not self .control_connection .refresh_schema (schema_agreement_wait = max_schema_agreement_wait ):
1282+ if not self .control_connection .refresh_schema (schema_agreement_wait = max_schema_agreement_wait , force = True ):
12481283 raise Exception ("Schema metadata was not refreshed. See log for details." )
12491284
12501285 def refresh_keyspace_metadata (self , keyspace , max_schema_agreement_wait = None ):
@@ -1255,7 +1290,7 @@ def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None):
12551290 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12561291 """
12571292 if not self .control_connection .refresh_schema (target_type = SchemaTargetType .KEYSPACE , keyspace = keyspace ,
1258- schema_agreement_wait = max_schema_agreement_wait ):
1293+ schema_agreement_wait = max_schema_agreement_wait , force = True ):
12591294 raise Exception ("Keyspace metadata was not refreshed. See log for details." )
12601295
12611296 def refresh_table_metadata (self , keyspace , table , max_schema_agreement_wait = None ):
@@ -1265,7 +1300,8 @@ def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None
12651300
12661301 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12671302 """
1268- if not self .control_connection .refresh_schema (target_type = SchemaTargetType .TABLE , keyspace = keyspace , table = table , schema_agreement_wait = max_schema_agreement_wait ):
1303+ if not self .control_connection .refresh_schema (target_type = SchemaTargetType .TABLE , keyspace = keyspace , table = table ,
1304+ schema_agreement_wait = max_schema_agreement_wait , force = True ):
12691305 raise Exception ("Table metadata was not refreshed. See log for details." )
12701306
12711307 def refresh_materialized_view_metadata (self , keyspace , view , max_schema_agreement_wait = None ):
@@ -1274,7 +1310,8 @@ def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreemen
12741310
12751311 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12761312 """
1277- if not self .control_connection .refresh_schema (target_type = SchemaTargetType .TABLE , keyspace = keyspace , table = view , schema_agreement_wait = max_schema_agreement_wait ):
1313+ if not self .control_connection .refresh_schema (target_type = SchemaTargetType .TABLE , keyspace = keyspace , table = view ,
1314+ schema_agreement_wait = max_schema_agreement_wait , force = True ):
12781315 raise Exception ("View metadata was not refreshed. See log for details." )
12791316
12801317 def refresh_user_type_metadata (self , keyspace , user_type , max_schema_agreement_wait = None ):
@@ -1283,7 +1320,8 @@ def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_w
12831320
12841321 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12851322 """
1286- if not self .control_connection .refresh_schema (target_type = SchemaTargetType .TYPE , keyspace = keyspace , type = user_type , schema_agreement_wait = max_schema_agreement_wait ):
1323+ if not self .control_connection .refresh_schema (target_type = SchemaTargetType .TYPE , keyspace = keyspace , type = user_type ,
1324+ schema_agreement_wait = max_schema_agreement_wait , force = True ):
12871325 raise Exception ("User Type metadata was not refreshed. See log for details." )
12881326
12891327 def refresh_user_function_metadata (self , keyspace , function , max_schema_agreement_wait = None ):
@@ -1294,7 +1332,8 @@ def refresh_user_function_metadata(self, keyspace, function, max_schema_agreemen
12941332
12951333 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12961334 """
1297- if not self .control_connection .refresh_schema (target_type = SchemaTargetType .FUNCTION , keyspace = keyspace , function = function , schema_agreement_wait = max_schema_agreement_wait ):
1335+ if not self .control_connection .refresh_schema (target_type = SchemaTargetType .FUNCTION , keyspace = keyspace , function = function ,
1336+ schema_agreement_wait = max_schema_agreement_wait , force = True ):
12981337 raise Exception ("User Function metadata was not refreshed. See log for details." )
12991338
13001339 def refresh_user_aggregate_metadata (self , keyspace , aggregate , max_schema_agreement_wait = None ):
@@ -1305,7 +1344,8 @@ def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreem
13051344
13061345 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
13071346 """
1308- if not self .control_connection .refresh_schema (target_type = SchemaTargetType .AGGREGATE , keyspace = keyspace , aggregate = aggregate , schema_agreement_wait = max_schema_agreement_wait ):
1347+ if not self .control_connection .refresh_schema (target_type = SchemaTargetType .AGGREGATE , keyspace = keyspace , aggregate = aggregate ,
1348+ schema_agreement_wait = max_schema_agreement_wait , force = True ):
13091349 raise Exception ("User Aggregate metadata was not refreshed. See log for details." )
13101350
13111351 def refresh_nodes (self ):
@@ -1319,6 +1359,8 @@ def refresh_nodes(self):
13191359
13201360 def set_meta_refresh_enabled (self , enabled ):
13211361 """
1362+ *Deprecated:* set :attr:`~.Cluster.schema_metadata_enabled` :attr:`~.Cluster.token_metadata_enabled` instead
1363+
13221364 Sets a flag to enable (True) or disable (False) all metadata refresh queries.
13231365 This applies to both schema and node topology.
13241366
@@ -1327,7 +1369,8 @@ def set_meta_refresh_enabled(self, enabled):
13271369 Meta refresh must be enabled for the driver to become aware of any cluster
13281370 topology changes or schema updates.
13291371 """
1330- self .control_connection .set_meta_refresh_enabled (bool (enabled ))
1372+ self .schema_metadata_enabled = enabled
1373+ self .token_metadata_enabled = enabled
13311374
13321375 def _prepare_all_queries (self , host ):
13331376 if not self ._prepared_statements :
@@ -2010,7 +2053,10 @@ class ControlConnection(object):
20102053 """
20112054
20122055 _SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers"
2056+ _SELECT_PEERS_NO_TOKENS = "SELECT peer, data_center, rack, rpc_address, schema_version FROM system.peers"
20132057 _SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
2058+ _SELECT_LOCAL_NO_TOKENS = "SELECT cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
2059+
20142060
20152061 _SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers"
20162062 _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
@@ -2022,14 +2068,17 @@ class ControlConnection(object):
20222068 _schema_event_refresh_window = None
20232069 _topology_event_refresh_window = None
20242070
2025- _meta_refresh_enabled = True
2071+ _schema_meta_enabled = True
2072+ _token_meta_enabled = True
20262073
20272074 # for testing purposes
20282075 _time = time
20292076
20302077 def __init__ (self , cluster , timeout ,
20312078 schema_event_refresh_window ,
2032- topology_event_refresh_window ):
2079+ topology_event_refresh_window ,
2080+ schema_meta_enabled = True ,
2081+ token_meta_enabled = True ):
20332082 # use a weak reference to allow the Cluster instance to be GC'ed (and
20342083 # shutdown) since implementing __del__ disables the cycle detector
20352084 self ._cluster = weakref .proxy (cluster )
@@ -2038,6 +2087,8 @@ def __init__(self, cluster, timeout,
20382087
20392088 self ._schema_event_refresh_window = schema_event_refresh_window
20402089 self ._topology_event_refresh_window = topology_event_refresh_window
2090+ self ._schema_meta_enabled = schema_meta_enabled
2091+ self ._token_meta_enabled = token_meta_enabled
20412092
20422093 self ._lock = RLock ()
20432094 self ._schema_agreement_lock = Lock ()
@@ -2119,8 +2170,10 @@ def _try_connect(self, host):
21192170 "SCHEMA_CHANGE" : partial (_watch_callback , self_weakref , '_handle_schema_change' )
21202171 }, register_timeout = self ._timeout )
21212172
2122- peers_query = QueryMessage (query = self ._SELECT_PEERS , consistency_level = ConsistencyLevel .ONE )
2123- local_query = QueryMessage (query = self ._SELECT_LOCAL , consistency_level = ConsistencyLevel .ONE )
2173+ sel_peers = self ._SELECT_PEERS if self ._token_meta_enabled else self ._SELECT_PEERS_NO_TOKENS
2174+ sel_local = self ._SELECT_LOCAL if self ._token_meta_enabled else self ._SELECT_LOCAL_NO_TOKENS
2175+ peers_query = QueryMessage (query = sel_peers , consistency_level = ConsistencyLevel .ONE )
2176+ local_query = QueryMessage (query = sel_local , consistency_level = ConsistencyLevel .ONE )
21242177 shared_results = connection .wait_for_responses (
21252178 peers_query , local_query , timeout = self ._timeout )
21262179
@@ -2200,28 +2253,29 @@ def shutdown(self):
22002253 self ._connection .close ()
22012254 del self ._connection
22022255
2203- def refresh_schema (self , ** kwargs ):
2204- if not self ._meta_refresh_enabled :
2205- log .debug ("[control connection] Skipping schema refresh because meta refresh is disabled" )
2206- return False
2207-
2256+ def refresh_schema (self , force = False , ** kwargs ):
22082257 try :
22092258 if self ._connection :
2210- return self ._refresh_schema (self ._connection , ** kwargs )
2259+ return self ._refresh_schema (self ._connection , force = force , ** kwargs )
22112260 except ReferenceError :
22122261 pass # our weak reference to the Cluster is no good
22132262 except Exception :
22142263 log .debug ("[control connection] Error refreshing schema" , exc_info = True )
22152264 self ._signal_error ()
22162265 return False
22172266
2218- def _refresh_schema (self , connection , preloaded_results = None , schema_agreement_wait = None , ** kwargs ):
2267+ def _refresh_schema (self , connection , preloaded_results = None , schema_agreement_wait = None , force = False , ** kwargs ):
22192268 if self ._cluster .is_shutdown :
22202269 return False
22212270
22222271 agreed = self .wait_for_schema_agreement (connection ,
22232272 preloaded_results = preloaded_results ,
22242273 wait_time = schema_agreement_wait )
2274+
2275+ if not self ._schema_meta_enabled and not force :
2276+ log .debug ("[control connection] Skipping schema refresh because schema metadata is disabled" )
2277+ return False
2278+
22252279 if not agreed :
22262280 log .debug ("Skipping schema refresh due to lack of schema agreement" )
22272281 return False
@@ -2231,10 +2285,6 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
22312285 return True
22322286
22332287 def refresh_node_list_and_token_map (self , force_token_rebuild = False ):
2234- if not self ._meta_refresh_enabled :
2235- log .debug ("[control connection] Skipping node list refresh because meta refresh is disabled" )
2236- return False
2237-
22382288 try :
22392289 if self ._connection :
22402290 self ._refresh_node_list_and_token_map (self ._connection , force_token_rebuild = force_token_rebuild )
@@ -2254,10 +2304,17 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
22542304 peers_result = preloaded_results [0 ]
22552305 local_result = preloaded_results [1 ]
22562306 else :
2257- log .debug ("[control connection] Refreshing node list and token map" )
22582307 cl = ConsistencyLevel .ONE
2259- peers_query = QueryMessage (query = self ._SELECT_PEERS , consistency_level = cl )
2260- local_query = QueryMessage (query = self ._SELECT_LOCAL , consistency_level = cl )
2308+ if not self ._token_meta_enabled :
2309+ log .debug ("[control connection] Refreshing node list without token map" )
2310+ sel_peers = self ._SELECT_PEERS_NO_TOKENS
2311+ sel_local = self ._SELECT_LOCAL_NO_TOKENS
2312+ else :
2313+ log .debug ("[control connection] Refreshing node list and token map" )
2314+ sel_peers = self ._SELECT_PEERS
2315+ sel_local = self ._SELECT_LOCAL
2316+ peers_query = QueryMessage (query = sel_peers , consistency_level = cl )
2317+ local_query = QueryMessage (query = sel_local , consistency_level = cl )
22612318 peers_result , local_result = connection .wait_for_responses (
22622319 peers_query , local_query , timeout = self ._timeout )
22632320
@@ -2296,8 +2353,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
22962353 if not addr or addr in ["0.0.0.0" , "::" ]:
22972354 addr = row .get ("peer" )
22982355
2299- tokens = row .get ("tokens" )
2300- if not tokens :
2356+ tokens = row .get ("tokens" , None )
2357+ if 'tokens' in row and not tokens : # it was selected, but empty
23012358 log .warning ("Excluding host (%s) with no tokens in system.peers table of %s." % (addr , connection .host ))
23022359 continue
23032360
@@ -2529,9 +2586,6 @@ def return_connection(self, connection):
25292586 if connection is self ._connection and (connection .is_defunct or connection .is_closed ):
25302587 self .reconnect ()
25312588
2532- def set_meta_refresh_enabled (self , enabled ):
2533- self ._meta_refresh_enabled = enabled
2534-
25352589
25362590def _stop_scheduler (scheduler , thread ):
25372591 try :
@@ -2626,13 +2680,9 @@ def _log_if_failed(self, future):
26262680
26272681def refresh_schema_and_set_result (control_conn , response_future , ** kwargs ):
26282682 try :
2629- if control_conn ._meta_refresh_enabled :
2630- log .debug ("Refreshing schema in response to schema change. "
2631- "%s" , kwargs )
2632- response_future .is_schema_agreed = control_conn ._refresh_schema (response_future ._connection , ** kwargs )
2633- else :
2634- log .debug ("Skipping schema refresh in response to schema change because meta refresh is disabled; "
2635- "%s" , kwargs )
2683+ log .debug ("Refreshing schema in response to schema change. "
2684+ "%s" , kwargs )
2685+ response_future .is_schema_agreed = control_conn ._refresh_schema (response_future ._connection , ** kwargs )
26362686 except Exception :
26372687 log .exception ("Exception refreshing schema in response to schema change:" )
26382688 response_future .session .submit (control_conn .refresh_schema , ** kwargs )
0 commit comments