Skip to content

Commit f1b78e1

Browse files
committed
Merge remote-tracking branch 'origin/3.4'
Conflicts: benchmarks/base.py
2 parents c1a7c63 + 598cb76 commit f1b78e1

6 files changed

Lines changed: 208 additions & 46 deletions

File tree

benchmarks/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
def setup(options):
7373
log.info("Using 'cassandra' package from %s", cassandra.__path__)
7474

75-
cluster = Cluster(options.hosts, protocol_version=options.protocol_version)
75+
cluster = Cluster(hosts, schema_metadata_enabled=False, token_metadata_enabled=False)
7676
try:
7777
session = cluster.connect()
7878

@@ -107,8 +107,8 @@ def setup(options):
107107
cluster.shutdown()
108108

109109

110-
def teardown(options):
111-
cluster = Cluster(options.hosts, protocol_version=options.protocol_version)
110+
def teardown(hosts):
111+
cluster = Cluster(hosts, schema_metadata_enabled=False, token_metadata_enabled=False)
112112
session = cluster.connect()
113113
if not options.keep_data:
114114
session.execute("DROP KEYSPACE " + options.keyspace)

cassandra/cluster.py

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import sys
2828
import time
2929
from threading import Lock, RLock, Thread, Event
30-
import warnings
3130

3231
import six
3332
from six.moves import range
@@ -166,10 +165,24 @@ def new_f(self, *args, **kwargs):
166165
return new_f
167166

168167

169-
def _shutdown_cluster(cluster):
170-
if cluster and not cluster.is_shutdown:
168+
_clusters_for_shutdown = set()
169+
170+
171+
def _register_cluster_shutdown(cluster):
172+
_clusters_for_shutdown.add(cluster)
173+
174+
175+
def _discard_cluster_shutdown(cluster):
176+
_clusters_for_shutdown.discard(cluster)
177+
178+
179+
def _shutdown_clusters():
180+
clusters = _clusters_for_shutdown.copy() # copy because shutdown modifies the global set "discard"
181+
for cluster in clusters:
171182
cluster.shutdown()
172183

184+
atexit.register(_shutdown_clusters)
185+
173186

174187
# murmur3 implementation required for TokenAware is only available for CPython
175188
import platform
@@ -868,7 +881,9 @@ def protocol_downgrade(self, host_addr, previous_version):
868881
new_version = previous_version - 1
869882
if new_version < self.protocol_version:
870883
if new_version >= MIN_SUPPORTED_VERSION:
871-
log.warning("Downgrading core protocol version from %d to %d for %s", self.protocol_version, new_version, host_addr)
884+
log.warning("Downgrading core protocol version from %d to %d for %s. "
885+
"To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. "
886+
"http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_addr)
872887
self.protocol_version = new_version
873888
else:
874889
raise DriverException("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version, MIN_SUPPORTED_VERSION))
@@ -887,7 +902,7 @@ def connect(self, keyspace=None):
887902
log.debug("Connecting to cluster, contact points: %s; protocol version: %s",
888903
self.contact_points, self.protocol_version)
889904
self.connection_class.initialize_reactor()
890-
atexit.register(partial(_shutdown_cluster, self))
905+
_register_cluster_shutdown(self)
891906
for address in self.contact_points_resolved:
892907
host, new = self.add_host(address, signal=False)
893908
if new:
@@ -951,6 +966,8 @@ def shutdown(self):
951966

952967
self.executor.shutdown()
953968

969+
_discard_cluster_shutdown(self)
970+
954971
def _new_session(self):
955972
session = Session(self, self.metadata.all_hosts())
956973
self._session_register_user_types(session)
@@ -1144,7 +1161,7 @@ def on_add(self, host, refresh_nodes=True):
11441161
if distance == HostDistance.IGNORED:
11451162
log.debug("Not adding connection pool for new host %r because the "
11461163
"load balancing policy has marked it as IGNORED", host)
1147-
self._finalize_add(host)
1164+
self._finalize_add(host, set_up=False)
11481165
return
11491166

11501167
futures_lock = Lock()
@@ -1186,9 +1203,10 @@ def future_completed(future):
11861203
if not have_future:
11871204
self._finalize_add(host)
11881205

1189-
def _finalize_add(self, host):
1190-
# mark the host as up and notify all listeners
1191-
host.set_up()
1206+
def _finalize_add(self, host, set_up=True):
1207+
if set_up:
1208+
host.set_up()
1209+
11921210
for listener in self.listeners:
11931211
listener.on_add(host)
11941212

@@ -2356,6 +2374,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
23562374
cluster_name = local_row["cluster_name"]
23572375
self._cluster.metadata.cluster_name = cluster_name
23582376

2377+
partitioner = local_row.get("partitioner")
2378+
tokens = local_row.get("tokens")
2379+
23592380
host = self._cluster.metadata.get_host(connection.host)
23602381
if host:
23612382
datacenter = local_row.get("data_center")
@@ -2365,10 +2386,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
23652386
host.broadcast_address = local_row.get("broadcast_address")
23662387
host.release_version = local_row.get("release_version")
23672388

2368-
partitioner = local_row.get("partitioner")
2369-
tokens = local_row.get("tokens")
2370-
if partitioner and tokens:
2371-
token_map[host] = tokens
2389+
if partitioner and tokens:
2390+
token_map[host] = tokens
23722391

23732392
# Check metadata.partitioner to see if we haven't built anything yet. If
23742393
# every node in the cluster was in the contact points, we won't discover
@@ -2550,13 +2569,14 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
25502569
if local_row.get("schema_version"):
25512570
versions[local_row.get("schema_version")].add(local_address)
25522571

2572+
lbp = self._cluster.load_balancing_policy
25532573
for row in peers_result:
25542574
schema_ver = row.get('schema_version')
25552575
if not schema_ver:
25562576
continue
25572577
addr = self._rpc_from_peer_row(row)
25582578
peer = self._cluster.metadata.get_host(addr)
2559-
if peer and peer.is_up:
2579+
if peer and peer.is_up and lbp.distance(peer) != HostDistance.IGNORED:
25602580
versions[schema_ver].add(addr)
25612581

25622582
if len(versions) == 1:
@@ -2609,7 +2629,13 @@ def on_add(self, host, refresh_nodes=True):
26092629
self.refresh_node_list_and_token_map(force_token_rebuild=True)
26102630

26112631
def on_remove(self, host):
2612-
self.refresh_node_list_and_token_map(force_token_rebuild=True)
2632+
c = self._connection
2633+
if c and c.host == host.address:
2634+
log.debug("[control connection] Control connection host (%s) is being removed. Reconnecting", host)
2635+
# refresh will be done on reconnect
2636+
self.reconnect()
2637+
else:
2638+
self.refresh_node_list_and_token_map(force_token_rebuild=True)
26132639

26142640
def get_connections(self):
26152641
c = getattr(self, '_connection', None)
@@ -2630,7 +2656,7 @@ def _stop_scheduler(scheduler, thread):
26302656
thread.join()
26312657

26322658

2633-
class _Scheduler(object):
2659+
class _Scheduler(Thread):
26342660

26352661
_queue = None
26362662
_scheduled_tasks = None
@@ -2643,13 +2669,9 @@ def __init__(self, executor):
26432669
self._count = count()
26442670
self._executor = executor
26452671

2646-
t = Thread(target=self.run, name="Task Scheduler")
2647-
t.daemon = True
2648-
t.start()
2649-
2650-
# although this runs on a daemonized thread, we prefer to stop
2651-
# it gracefully to avoid random errors during interpreter shutdown
2652-
atexit.register(partial(_stop_scheduler, weakref.proxy(self), t))
2672+
Thread.__init__(self, name="Task Scheduler")
2673+
self.daemon = True
2674+
self.start()
26532675

26542676
def shutdown(self):
26552677
try:
@@ -2659,6 +2681,7 @@ def shutdown(self):
26592681
pass
26602682
self.is_shutdown = True
26612683
self._queue.put_nowait((0, 0, None))
2684+
self.join()
26622685

26632686
def schedule(self, delay, fn, *args, **kwargs):
26642687
self._insert_task(delay, (fn, args, tuple(kwargs.items())))
@@ -2687,7 +2710,8 @@ def run(self):
26872710
while True:
26882711
run_at, i, task = self._queue.get(block=True, timeout=None)
26892712
if self.is_shutdown:
2690-
log.debug("Not executing scheduled task due to Scheduler shutdown")
2713+
if task:
2714+
log.debug("Not executing scheduled task due to Scheduler shutdown")
26912715
return
26922716
if run_at <= time.time():
26932717
self._scheduled_tasks.discard(task)

cassandra/connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -601,8 +601,8 @@ def process_msg(self, header, body):
601601
if isinstance(response, ProtocolException):
602602
if 'unsupported protocol version' in response.message:
603603
self.is_unsupported_proto_version = True
604-
605-
log.error("Closing connection %s due to protocol error: %s", self, response.summary_msg())
604+
else:
605+
log.error("Closing connection %s due to protocol error: %s", self, response.summary_msg())
606606
self.defunct(response)
607607
if callback is not None:
608608
callback(response)

0 commit comments

Comments
 (0)