|
6 | 6 | from concurrent.futures import ThreadPoolExecutor |
7 | 7 | import logging |
8 | 8 | import time |
9 | | -from threading import RLock, Thread, Event |
| 9 | +from threading import Lock, RLock, Thread, Event |
10 | 10 | import traceback |
11 | 11 | import Queue |
12 | 12 | import weakref |
@@ -869,6 +869,7 @@ def __init__(self, cluster): |
869 | 869 | self._connection = None |
870 | 870 |
|
871 | 871 | self._lock = RLock() |
| 872 | + self._schema_agreement_lock = Lock() |
872 | 873 |
|
873 | 874 | self._reconnection_handler = None |
874 | 875 | self._reconnection_lock = RLock() |
@@ -1144,45 +1145,50 @@ def _handle_schema_change(self, event): |
1144 | 1145 | self._cluster.executor.submit(self.refresh_schema, keyspace, table) |
1145 | 1146 |
|
1146 | 1147 | def wait_for_schema_agreement(self, connection=None): |
1147 | | - log.debug("[control connection] Waiting for schema agreement") |
1148 | | - if not connection: |
1149 | | - connection = self._connection |
| 1148 | + # Each schema change typically generates two schema refreshes, one |
| 1149 | + # from the response type and one from the pushed notification. Holding |
| 1150 | + # a lock is just a simple way to cut down on the number of schema queries |
| 1151 | + # we'll make. |
| 1152 | + with self._schema_agreement_lock: |
| 1153 | + log.debug("[control connection] Waiting for schema agreement") |
| 1154 | + if not connection: |
| 1155 | + connection = self._connection |
| 1156 | + |
| 1157 | + start = self._time.time() |
| 1158 | + elapsed = 0 |
| 1159 | + cl = ConsistencyLevel.ONE |
| 1160 | + while elapsed < self._cluster.max_schema_agreement_wait: |
| 1161 | + peers_query = QueryMessage(query=self._SELECT_SCHEMA_PEERS, consistency_level=cl) |
| 1162 | + local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl) |
| 1163 | + peers_result, local_result = connection.wait_for_responses(peers_query, local_query) |
| 1164 | + peers_result = dict_factory(*peers_result.results) |
| 1165 | + |
| 1166 | + versions = set() |
| 1167 | + if local_result.results: |
| 1168 | + local_row = dict_factory(*local_result.results)[0] |
| 1169 | + if local_row.get("schema_version"): |
| 1170 | + versions.add(local_row.get("schema_version")) |
| 1171 | + |
| 1172 | + for row in peers_result: |
| 1173 | + if not row.get("rpc_address") or not row.get("schema_version"): |
| 1174 | + continue |
| 1175 | + |
| 1176 | + rpc = row.get("rpc_address") |
| 1177 | + if rpc == "0.0.0.0": # TODO ipv6 check |
| 1178 | + rpc = row.get("peer") |
| 1179 | + |
| 1180 | + peer = self._cluster.metadata.get_host(rpc) |
| 1181 | + if peer and peer.monitor.is_up: |
| 1182 | + versions.add(row.get("schema_version")) |
| 1183 | + |
| 1184 | + if len(versions) == 1: |
| 1185 | + return True |
| 1186 | + |
| 1187 | + log.debug("[control connection] Schemas mismatched, trying again") |
| 1188 | + self._time.sleep(0.2) |
| 1189 | + elapsed = self._time.time() - start |
1150 | 1190 |
|
1151 | | - start = self._time.time() |
1152 | | - elapsed = 0 |
1153 | | - cl = ConsistencyLevel.ONE |
1154 | | - while elapsed < self._cluster.max_schema_agreement_wait: |
1155 | | - peers_query = QueryMessage(query=self._SELECT_SCHEMA_PEERS, consistency_level=cl) |
1156 | | - local_query = QueryMessage(query=self._SELECT_SCHEMA_LOCAL, consistency_level=cl) |
1157 | | - peers_result, local_result = connection.wait_for_responses(peers_query, local_query) |
1158 | | - peers_result = dict_factory(*peers_result.results) |
1159 | | - |
1160 | | - versions = set() |
1161 | | - if local_result.results: |
1162 | | - local_row = dict_factory(*local_result.results)[0] |
1163 | | - if local_row.get("schema_version"): |
1164 | | - versions.add(local_row.get("schema_version")) |
1165 | | - |
1166 | | - for row in peers_result: |
1167 | | - if not row.get("rpc_address") or not row.get("schema_version"): |
1168 | | - continue |
1169 | | - |
1170 | | - rpc = row.get("rpc_address") |
1171 | | - if rpc == "0.0.0.0": # TODO ipv6 check |
1172 | | - rpc = row.get("peer") |
1173 | | - |
1174 | | - peer = self._cluster.metadata.get_host(rpc) |
1175 | | - if peer and peer.monitor.is_up: |
1176 | | - versions.add(row.get("schema_version")) |
1177 | | - |
1178 | | - if len(versions) == 1: |
1179 | | - return True |
1180 | | - |
1181 | | - log.debug("[control connection] Schemas mismatched, trying again") |
1182 | | - self._time.sleep(0.2) |
1183 | | - elapsed = self._time.time() - start |
1184 | | - |
1185 | | - return False |
| 1191 | + return False |
1186 | 1192 |
|
1187 | 1193 | def _signal_error(self): |
1188 | 1194 | # try just signaling the host monitor, as this will trigger a reconnect |
|
0 commit comments