@@ -481,6 +481,37 @@ def auth_provider(self, value):
481481 establishment, options passing, and authentication.
482482 """
483483
484+ @property
485+ def schema_metadata_enabled(self):
486+ """
487+ Flag indicating whether internal schema metadata is updated.
488+
489+ When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This
490+ can be used to speed initial connection, and reduce load on client and server during operation. Turning this off
491+ gives away token aware request routing, and programmatic inspection of the metadata model.
492+ """
493+ return self.control_connection._schema_meta_enabled
494+
495+ @schema_metadata_enabled.setter
496+ def schema_metadata_enabled(self, enabled):
497+ self.control_connection._schema_meta_enabled = bool(enabled)
498+
499+ @property
500+ def token_metadata_enabled(self):
501+ """
502+ Flag indicating whether internal token metadata is updated.
503+
504+ When disabled, the driver does not query node token information on connect, or on topology change events. This
505+ can be used to speed initial connection, and reduce load on client and server during operation. It is most useful
506+ in large clusters using vnodes, where the token map can be expensive to compute. Turning this off
507+ gives away token aware request routing, and programmatic inspection of the token ring.
508+ """
509+ return self.control_connection._token_meta_enabled
510+
511+ @token_metadata_enabled.setter
512+ def token_metadata_enabled(self, enabled):
513+ self.control_connection._token_meta_enabled = bool(enabled)
514+
484515 sessions = None
485516 control_connection = None
486517 scheduler = None
@@ -520,7 +551,9 @@ def __init__(self,
520551 idle_heartbeat_interval=30,
521552 schema_event_refresh_window=2,
522553 topology_event_refresh_window=10,
523- connect_timeout=5):
554+ connect_timeout=5,
555+ schema_metadata_enabled=True,
556+ token_metadata_enabled=True):
524557 """
525558 Any of the mutable Cluster attributes may be set as keyword arguments
526559 to the constructor.
@@ -619,7 +652,9 @@ def __init__(self,
619652
620653 self.control_connection = ControlConnection(
621654 self, self.control_connection_timeout,
622- self.schema_event_refresh_window, self.topology_event_refresh_window)
655+ self.schema_event_refresh_window, self.topology_event_refresh_window,
656+ schema_metadata_enabled, token_metadata_enabled)
657+
623658
624659 def register_user_type(self, keyspace, user_type, klass):
625660 """
@@ -1244,7 +1279,7 @@ def refresh_schema_metadata(self, max_schema_agreement_wait=None):
12441279
12451280 An Exception is raised if schema refresh fails for any reason.
12461281 """
1247- if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait):
1282+ if not self.control_connection.refresh_schema(schema_agreement_wait=max_schema_agreement_wait, force=True ):
12481283 raise Exception("Schema metadata was not refreshed. See log for details.")
12491284
12501285 def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None):
@@ -1255,7 +1290,7 @@ def refresh_keyspace_metadata(self, keyspace, max_schema_agreement_wait=None):
12551290 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12561291 """
12571292 if not self.control_connection.refresh_schema(target_type=SchemaTargetType.KEYSPACE, keyspace=keyspace,
1258- schema_agreement_wait=max_schema_agreement_wait):
1293+ schema_agreement_wait=max_schema_agreement_wait, force=True ):
12591294 raise Exception("Keyspace metadata was not refreshed. See log for details.")
12601295
12611296 def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None):
@@ -1265,7 +1300,8 @@ def refresh_table_metadata(self, keyspace, table, max_schema_agreement_wait=None
12651300
12661301 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12671302 """
1268- if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table, schema_agreement_wait=max_schema_agreement_wait):
1303+ if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=table,
1304+ schema_agreement_wait=max_schema_agreement_wait, force=True):
12691305 raise Exception("Table metadata was not refreshed. See log for details.")
12701306
12711307 def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreement_wait=None):
@@ -1274,7 +1310,8 @@ def refresh_materialized_view_metadata(self, keyspace, view, max_schema_agreemen
12741310
12751311 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12761312 """
1277- if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view, schema_agreement_wait=max_schema_agreement_wait):
1313+ if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TABLE, keyspace=keyspace, table=view,
1314+ schema_agreement_wait=max_schema_agreement_wait, force=True):
12781315 raise Exception("View metadata was not refreshed. See log for details.")
12791316
12801317 def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_wait=None):
@@ -1283,7 +1320,8 @@ def refresh_user_type_metadata(self, keyspace, user_type, max_schema_agreement_w
12831320
12841321 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12851322 """
1286- if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type, schema_agreement_wait=max_schema_agreement_wait):
1323+ if not self.control_connection.refresh_schema(target_type=SchemaTargetType.TYPE, keyspace=keyspace, type=user_type,
1324+ schema_agreement_wait=max_schema_agreement_wait, force=True):
12871325 raise Exception("User Type metadata was not refreshed. See log for details.")
12881326
12891327 def refresh_user_function_metadata(self, keyspace, function, max_schema_agreement_wait=None):
@@ -1294,7 +1332,8 @@ def refresh_user_function_metadata(self, keyspace, function, max_schema_agreemen
12941332
12951333 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
12961334 """
1297- if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function, schema_agreement_wait=max_schema_agreement_wait):
1335+ if not self.control_connection.refresh_schema(target_type=SchemaTargetType.FUNCTION, keyspace=keyspace, function=function,
1336+ schema_agreement_wait=max_schema_agreement_wait, force=True):
12981337 raise Exception("User Function metadata was not refreshed. See log for details.")
12991338
13001339 def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreement_wait=None):
@@ -1305,7 +1344,8 @@ def refresh_user_aggregate_metadata(self, keyspace, aggregate, max_schema_agreem
13051344
13061345 See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
13071346 """
1308- if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate, schema_agreement_wait=max_schema_agreement_wait):
1347+ if not self.control_connection.refresh_schema(target_type=SchemaTargetType.AGGREGATE, keyspace=keyspace, aggregate=aggregate,
1348+ schema_agreement_wait=max_schema_agreement_wait, force=True):
13091349 raise Exception("User Aggregate metadata was not refreshed. See log for details.")
13101350
13111351 def refresh_nodes(self):
@@ -1319,6 +1359,8 @@ def refresh_nodes(self):
13191359
13201360 def set_meta_refresh_enabled(self, enabled):
13211361 """
1362+ *Deprecated:* set :attr:`~.Cluster.schema_metadata_enabled` :attr:`~.Cluster.token_metadata_enabled` instead
1363+
13221364 Sets a flag to enable (True) or disable (False) all metadata refresh queries.
13231365 This applies to both schema and node topology.
13241366
@@ -1327,7 +1369,8 @@ def set_meta_refresh_enabled(self, enabled):
13271369 Meta refresh must be enabled for the driver to become aware of any cluster
13281370 topology changes or schema updates.
13291371 """
1330- self.control_connection.set_meta_refresh_enabled(bool(enabled))
1372+ self.schema_metadata_enabled = enabled
1373+ self.token_metadata_enabled = enabled
13311374
13321375 def _prepare_all_queries(self, host):
13331376 if not self._prepared_statements:
@@ -2010,7 +2053,10 @@ class ControlConnection(object):
20102053 """
20112054
20122055 _SELECT_PEERS = "SELECT peer, data_center, rack, tokens, rpc_address, schema_version FROM system.peers"
2056+ _SELECT_PEERS_NO_TOKENS = "SELECT peer, data_center, rack, rpc_address, schema_version FROM system.peers"
20132057 _SELECT_LOCAL = "SELECT cluster_name, data_center, rack, tokens, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
2058+ _SELECT_LOCAL_NO_TOKENS = "SELECT cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'"
2059+
20142060
20152061 _SELECT_SCHEMA_PEERS = "SELECT peer, rpc_address, schema_version FROM system.peers"
20162062 _SELECT_SCHEMA_LOCAL = "SELECT schema_version FROM system.local WHERE key='local'"
@@ -2022,14 +2068,17 @@ class ControlConnection(object):
20222068 _schema_event_refresh_window = None
20232069 _topology_event_refresh_window = None
20242070
2025- _meta_refresh_enabled = True
2071+ _schema_meta_enabled = True
2072+ _token_meta_enabled = True
20262073
20272074 # for testing purposes
20282075 _time = time
20292076
20302077 def __init__(self, cluster, timeout,
20312078 schema_event_refresh_window,
2032- topology_event_refresh_window):
2079+ topology_event_refresh_window,
2080+ schema_meta_enabled=True,
2081+ token_meta_enabled=True):
20332082 # use a weak reference to allow the Cluster instance to be GC'ed (and
20342083 # shutdown) since implementing __del__ disables the cycle detector
20352084 self._cluster = weakref.proxy(cluster)
@@ -2038,6 +2087,8 @@ def __init__(self, cluster, timeout,
20382087
20392088 self._schema_event_refresh_window = schema_event_refresh_window
20402089 self._topology_event_refresh_window = topology_event_refresh_window
2090+ self._schema_meta_enabled = schema_meta_enabled
2091+ self._token_meta_enabled = token_meta_enabled
20412092
20422093 self._lock = RLock()
20432094 self._schema_agreement_lock = Lock()
@@ -2119,8 +2170,10 @@ def _try_connect(self, host):
21192170 "SCHEMA_CHANGE": partial(_watch_callback, self_weakref, '_handle_schema_change')
21202171 }, register_timeout=self._timeout)
21212172
2122- peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=ConsistencyLevel.ONE)
2123- local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=ConsistencyLevel.ONE)
2173+ sel_peers = self._SELECT_PEERS if self._token_meta_enabled else self._SELECT_PEERS_NO_TOKENS
2174+ sel_local = self._SELECT_LOCAL if self._token_meta_enabled else self._SELECT_LOCAL_NO_TOKENS
2175+ peers_query = QueryMessage(query=sel_peers, consistency_level=ConsistencyLevel.ONE)
2176+ local_query = QueryMessage(query=sel_local, consistency_level=ConsistencyLevel.ONE)
21242177 shared_results = connection.wait_for_responses(
21252178 peers_query, local_query, timeout=self._timeout)
21262179
@@ -2200,28 +2253,29 @@ def shutdown(self):
22002253 self._connection.close()
22012254 del self._connection
22022255
2203- def refresh_schema(self, **kwargs):
2204- if not self._meta_refresh_enabled:
2205- log.debug("[control connection] Skipping schema refresh because meta refresh is disabled")
2206- return False
2207-
2256+ def refresh_schema(self, force=False, **kwargs):
22082257 try:
22092258 if self._connection:
2210- return self._refresh_schema(self._connection, **kwargs)
2259+ return self._refresh_schema(self._connection, force=force, **kwargs)
22112260 except ReferenceError:
22122261 pass # our weak reference to the Cluster is no good
22132262 except Exception:
22142263 log.debug("[control connection] Error refreshing schema", exc_info=True)
22152264 self._signal_error()
22162265 return False
22172266
2218- def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, **kwargs):
2267+ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_wait=None, force=False, **kwargs):
22192268 if self._cluster.is_shutdown:
22202269 return False
22212270
22222271 agreed = self.wait_for_schema_agreement(connection,
22232272 preloaded_results=preloaded_results,
22242273 wait_time=schema_agreement_wait)
2274+
2275+ if not self._schema_meta_enabled and not force:
2276+ log.debug("[control connection] Skipping schema refresh because schema metadata is disabled")
2277+ return False
2278+
22252279 if not agreed:
22262280 log.debug("Skipping schema refresh due to lack of schema agreement")
22272281 return False
@@ -2231,10 +2285,6 @@ def _refresh_schema(self, connection, preloaded_results=None, schema_agreement_w
22312285 return True
22322286
22332287 def refresh_node_list_and_token_map(self, force_token_rebuild=False):
2234- if not self._meta_refresh_enabled:
2235- log.debug("[control connection] Skipping node list refresh because meta refresh is disabled")
2236- return False
2237-
22382288 try:
22392289 if self._connection:
22402290 self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild)
@@ -2254,10 +2304,17 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
22542304 peers_result = preloaded_results[0]
22552305 local_result = preloaded_results[1]
22562306 else:
2257- log.debug("[control connection] Refreshing node list and token map")
22582307 cl = ConsistencyLevel.ONE
2259- peers_query = QueryMessage(query=self._SELECT_PEERS, consistency_level=cl)
2260- local_query = QueryMessage(query=self._SELECT_LOCAL, consistency_level=cl)
2308+ if not self._token_meta_enabled:
2309+ log.debug("[control connection] Refreshing node list without token map")
2310+ sel_peers = self._SELECT_PEERS_NO_TOKENS
2311+ sel_local = self._SELECT_LOCAL_NO_TOKENS
2312+ else:
2313+ log.debug("[control connection] Refreshing node list and token map")
2314+ sel_peers = self._SELECT_PEERS
2315+ sel_local = self._SELECT_LOCAL
2316+ peers_query = QueryMessage(query=sel_peers, consistency_level=cl)
2317+ local_query = QueryMessage(query=sel_local, consistency_level=cl)
22612318 peers_result, local_result = connection.wait_for_responses(
22622319 peers_query, local_query, timeout=self._timeout)
22632320
@@ -2296,8 +2353,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
22962353 if not addr or addr in ["0.0.0.0", "::"]:
22972354 addr = row.get("peer")
22982355
2299- tokens = row.get("tokens")
2300- if not tokens:
2356+ tokens = row.get("tokens", None )
2357+ if 'tokens' in row and not tokens: # it was selected, but empty
23012358 log.warning("Excluding host (%s) with no tokens in system.peers table of %s." % (addr, connection.host))
23022359 continue
23032360
@@ -2529,9 +2586,6 @@ def return_connection(self, connection):
25292586 if connection is self._connection and (connection.is_defunct or connection.is_closed):
25302587 self.reconnect()
25312588
2532- def set_meta_refresh_enabled(self, enabled):
2533- self._meta_refresh_enabled = enabled
2534-
25352589
25362590def _stop_scheduler(scheduler, thread):
25372591 try:
@@ -2626,13 +2680,9 @@ def _log_if_failed(self, future):
26262680
26272681def refresh_schema_and_set_result(control_conn, response_future, **kwargs):
26282682 try:
2629- if control_conn._meta_refresh_enabled:
2630- log.debug("Refreshing schema in response to schema change. "
2631- "%s", kwargs)
2632- response_future.is_schema_agreed = control_conn._refresh_schema(response_future._connection, **kwargs)
2633- else:
2634- log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; "
2635- "%s", kwargs)
2683+ log.debug("Refreshing schema in response to schema change. "
2684+ "%s", kwargs)
2685+ response_future.is_schema_agreed = control_conn._refresh_schema(response_future._connection, **kwargs)
26362686 except Exception:
26372687 log.exception("Exception refreshing schema in response to schema change:")
26382688 response_future.session.submit(control_conn.refresh_schema, **kwargs)
0 commit comments