Skip to content

Commit 3233bf4

Browse files
committed
Minimize extra schema agreement checks
1 parent c88189d commit 3233bf4

1 file changed

Lines changed: 45 additions & 39 deletions

File tree

cassandra/cluster.py

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from concurrent.futures import ThreadPoolExecutor
77
import logging
88
import time
9-
from threading import RLock, Thread, Event
9+
from threading import Lock, RLock, Thread, Event
1010
import traceback
1111
import Queue
1212
import weakref
@@ -869,6 +869,7 @@ def __init__(self, cluster):
869869
self._connection = None
870870

871871
self._lock = RLock()
872+
self._schema_agreement_lock = Lock()
872873

873874
self._reconnection_handler = None
874875
self._reconnection_lock = RLock()
@@ -1144,45 +1145,50 @@ def _handle_schema_change(self, event):
11441145
self._cluster.executor.submit(self.refresh_schema, keyspace, table)
11451146

11461147
def wait_for_schema_agreement(self, connection=None):
1147-
log.debug("[control connection] Waiting for schema agreement")
1148-
if not connection:
1149-
connection = self._connection
1148+
# Each schema change typically generates two schema refreshes, one
1149+
# from the response type and one from the pushed notification. Holding
1150+
# a lock is just a simple way to cut down on the number of schema queries
1151+
# we'll make.
1152+
with self._schema_agreement_lock:
1153+
log.debug("[control connection] Waiting for schema agreement")
1154+
if not connection:
1155+
connection = self._connection
1156+
1157+
start = self._time.time()
1158+
elapsed = 0
1159+
cl = ConsistencyLevel.ONE
1160+
while elapsed < self._cluster.max_schema_agreement_wait:
1161+
peers_query = QueryMessage(query=self._SELECT_SCHEMA_PEERS, consistency_level=cl)
1162+
local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl)
1163+
peers_result, local_result = connection.wait_for_responses(peers_query, local_query)
1164+
peers_result = dict_factory(*peers_result.results)
1165+
1166+
versions = set()
1167+
if local_result.results:
1168+
local_row = dict_factory(*local_result.results)[0]
1169+
if local_row.get("schema_version"):
1170+
versions.add(local_row.get("schema_version"))
1171+
1172+
for row in peers_result:
1173+
if not row.get("rpc_address") or not row.get("schema_version"):
1174+
continue
1175+
1176+
rpc = row.get("rpc_address")
1177+
if rpc == "0.0.0.0": # TODO ipv6 check
1178+
rpc = row.get("peer")
1179+
1180+
peer = self._cluster.metadata.get_host(rpc)
1181+
if peer and peer.monitor.is_up:
1182+
versions.add(row.get("schema_version"))
1183+
1184+
if len(versions) == 1:
1185+
return True
1186+
1187+
log.debug("[control connection] Schemas mismatched, trying again")
1188+
self._time.sleep(0.2)
1189+
elapsed = self._time.time() - start
11501190

1151-
start = self._time.time()
1152-
elapsed = 0
1153-
cl = ConsistencyLevel.ONE
1154-
while elapsed < self._cluster.max_schema_agreement_wait:
1155-
peers_query = QueryMessage(query=self._SELECT_SCHEMA_PEERS, consistency_level=cl)
1156-
local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl)
1157-
peers_result, local_result = connection.wait_for_responses(peers_query, local_query)
1158-
peers_result = dict_factory(*peers_result.results)
1159-
1160-
versions = set()
1161-
if local_result.results:
1162-
local_row = dict_factory(*local_result.results)[0]
1163-
if local_row.get("schema_version"):
1164-
versions.add(local_row.get("schema_version"))
1165-
1166-
for row in peers_result:
1167-
if not row.get("rpc_address") or not row.get("schema_version"):
1168-
continue
1169-
1170-
rpc = row.get("rpc_address")
1171-
if rpc == "0.0.0.0": # TODO ipv6 check
1172-
rpc = row.get("peer")
1173-
1174-
peer = self._cluster.metadata.get_host(rpc)
1175-
if peer and peer.monitor.is_up:
1176-
versions.add(row.get("schema_version"))
1177-
1178-
if len(versions) == 1:
1179-
return True
1180-
1181-
log.debug("[control connection] Schemas mismatched, trying again")
1182-
self._time.sleep(0.2)
1183-
elapsed = self._time.time() - start
1184-
1185-
return False
1191+
return False
11861192

11871193
def _signal_error(self):
11881194
# try just signaling the host monitor, as this will trigger a reconnect

0 commit comments

Comments
 (0)