Skip to content

Commit 39fbdfb

Browse files
committed
Add API for disabling all meta refresh.
Also a synchronous node refresh call.
1 parent ae20e99 commit 39fbdfb

1 file changed

Lines changed: 42 additions & 6 deletions

File tree

cassandra/cluster.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,24 @@ def submit_schema_refresh(self, keyspace=None, table=None, usertype=None):
11391139
return self.executor.submit(
11401140
self.control_connection.refresh_schema, keyspace, table, usertype)
11411141

1142+
def refresh_nodes(self):
1143+
"""
1144+
Synchronously refresh the node list and token metadata
1145+
1146+
An Exception is raised if node refresh fails for any reason.
1147+
"""
1148+
if not self.control_connection.refresh_node_list_and_token_map():
1149+
raise Exception("Node list was not refreshed. See log for details.")
1150+
1151+
def set_meta_refresh_enabled(self, enabled):
1152+
"""
1153+
Sets a flag to enable (True) or disable (False) all metadata refresh queries.
1154+
This applies to both schema and node topology.
1155+
1156+
Disabling this is useful to minimize refreshes during multiple changes.
1157+
"""
1158+
self.control_connection.set_meta_refresh_enabled(bool(enabled))
1159+
11421160
def _prepare_all_queries(self, host):
11431161
if not self._prepared_statements:
11441162
return
@@ -1813,6 +1831,8 @@ class ControlConnection(object):
18131831
_schema_event_refresh_window = None
18141832
_topology_event_refresh_window = None
18151833

1834+
_meta_refresh_enabled = True
1835+
18161836
# for testing purposes
18171837
_time = time
18181838

@@ -1985,6 +2005,10 @@ def shutdown(self):
19852005

19862006
def refresh_schema(self, keyspace=None, table=None, usertype=None,
19872007
schema_agreement_wait=None):
2008+
if not self._meta_refresh_enabled:
2009+
log.debug("[control connection] Skipping schema refresh because meta refresh is disabled")
2010+
return False
2011+
19882012
try:
19892013
if self._connection:
19902014
return self._refresh_schema(self._connection, keyspace, table, usertype,
@@ -2112,14 +2136,20 @@ def _handle_results(success, result):
21122136
return True
21132137

21142138
def refresh_node_list_and_token_map(self, force_token_rebuild=False):
2139+
if not self._meta_refresh_enabled:
2140+
log.debug("[control connection] Skipping node list refresh because meta refresh is disabled")
2141+
return False
2142+
21152143
try:
21162144
if self._connection:
21172145
self._refresh_node_list_and_token_map(self._connection, force_token_rebuild=force_token_rebuild)
2146+
return True
21182147
except ReferenceError:
21192148
pass # our weak reference to the Cluster is no good
21202149
except Exception:
21212150
log.debug("[control connection] Error refreshing node list and token map", exc_info=True)
21222151
self._signal_error()
2152+
return False
21232153

21242154
def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
21252155
force_token_rebuild=False):
@@ -2387,6 +2417,9 @@ def return_connection(self, connection):
23872417
if connection is self._connection and (connection.is_defunct or connection.is_closed):
23882418
self.reconnect()
23892419

2420+
def set_meta_refresh_enabled(self, enabled):
2421+
self._meta_refresh_enabled = enabled
2422+
23902423

23912424
def _stop_scheduler(scheduler, thread):
23922425
try:
@@ -2435,16 +2468,15 @@ def schedule_unique(self, delay, fn, *args):
24352468
if task not in self._scheduled_tasks:
24362469
self._insert_task(delay, task)
24372470
else:
2438-
log.debug("Ignoring schedule_unique for already-scheduled task: %r", fn)
2439-
2471+
log.debug("Ignoring schedule_unique for already-scheduled task: %r", task)
24402472

24412473
def _insert_task(self, delay, task):
24422474
if not self.is_shutdown:
24432475
run_at = time.time() + delay
24442476
self._scheduled_tasks.add(task)
24452477
self._queue.put_nowait((run_at, task))
24462478
else:
2447-
log.debug("Ignoring scheduled function after shutdown: %r", fn)
2479+
log.debug("Ignoring scheduled task after shutdown: %r", task)
24482480

24492481
def run(self):
24502482
while True:
@@ -2480,9 +2512,13 @@ def _log_if_failed(self, future):
24802512

24812513
def refresh_schema_and_set_result(keyspace, table, usertype, control_conn, response_future):
24822514
try:
2483-
log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s",
2484-
keyspace, table, usertype)
2485-
control_conn._refresh_schema(response_future._connection, keyspace, table, usertype)
2515+
if control_conn._meta_refresh_enabled:
2516+
log.debug("Refreshing schema in response to schema change. Keyspace: %s; Table: %s, Type: %s",
2517+
keyspace, table, usertype)
2518+
control_conn._refresh_schema(response_future._connection, keyspace, table, usertype)
2519+
else:
2520+
log.debug("Skipping schema refresh in response to schema change because meta refresh is disabled; "
2521+
"Keyspace: %s; Table: %s, Type: %s", keyspace, table, usertype)
24862522
except Exception:
24872523
log.exception("Exception refreshing schema in response to schema change:")
24882524
response_future.session.submit(

0 commit comments

Comments
 (0)