2727import sys
2828import time
2929from threading import Lock , RLock , Thread , Event
30- import warnings
3130
3231import six
3332from six .moves import range
@@ -166,10 +165,24 @@ def new_f(self, *args, **kwargs):
166165 return new_f
167166
168167
169- def _shutdown_cluster (cluster ):
170- if cluster and not cluster .is_shutdown :
168+ _clusters_for_shutdown = set ()
169+
170+
171+ def _register_cluster_shutdown (cluster ):
172+ _clusters_for_shutdown .add (cluster )
173+
174+
175+ def _discard_cluster_shutdown (cluster ):
176+ _clusters_for_shutdown .discard (cluster )
177+
178+
179+ def _shutdown_clusters ():
180+ clusters = _clusters_for_shutdown .copy () # copy because shutdown modifies the global set "discard"
181+ for cluster in clusters :
171182 cluster .shutdown ()
172183
184+ atexit .register (_shutdown_clusters )
185+
173186
174187# murmur3 implementation required for TokenAware is only available for CPython
175188import platform
@@ -868,7 +881,9 @@ def protocol_downgrade(self, host_addr, previous_version):
868881 new_version = previous_version - 1
869882 if new_version < self .protocol_version :
870883 if new_version >= MIN_SUPPORTED_VERSION :
871- log .warning ("Downgrading core protocol version from %d to %d for %s" , self .protocol_version , new_version , host_addr )
884+ log .warning ("Downgrading core protocol version from %d to %d for %s. "
885+ "To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. "
886+ "http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version" , self .protocol_version , new_version , host_addr )
872887 self .protocol_version = new_version
873888 else :
874889 raise DriverException ("Cannot downgrade protocol version (%d) below minimum supported version: %d" % (new_version , MIN_SUPPORTED_VERSION ))
@@ -887,7 +902,7 @@ def connect(self, keyspace=None):
887902 log .debug ("Connecting to cluster, contact points: %s; protocol version: %s" ,
888903 self .contact_points , self .protocol_version )
889904 self .connection_class .initialize_reactor ()
890- atexit . register ( partial ( _shutdown_cluster , self ) )
905+ _register_cluster_shutdown ( self )
891906 for address in self .contact_points_resolved :
892907 host , new = self .add_host (address , signal = False )
893908 if new :
@@ -951,6 +966,8 @@ def shutdown(self):
951966
952967 self .executor .shutdown ()
953968
969+ _discard_cluster_shutdown (self )
970+
954971 def _new_session (self ):
955972 session = Session (self , self .metadata .all_hosts ())
956973 self ._session_register_user_types (session )
@@ -1144,7 +1161,7 @@ def on_add(self, host, refresh_nodes=True):
11441161 if distance == HostDistance .IGNORED :
11451162 log .debug ("Not adding connection pool for new host %r because the "
11461163 "load balancing policy has marked it as IGNORED" , host )
1147- self ._finalize_add (host )
1164+ self ._finalize_add (host , set_up = False )
11481165 return
11491166
11501167 futures_lock = Lock ()
@@ -1186,9 +1203,10 @@ def future_completed(future):
11861203 if not have_future :
11871204 self ._finalize_add (host )
11881205
1189- def _finalize_add (self , host ):
1190- # mark the host as up and notify all listeners
1191- host .set_up ()
1206+ def _finalize_add (self , host , set_up = True ):
1207+ if set_up :
1208+ host .set_up ()
1209+
11921210 for listener in self .listeners :
11931211 listener .on_add (host )
11941212
@@ -2356,6 +2374,9 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
23562374 cluster_name = local_row ["cluster_name" ]
23572375 self ._cluster .metadata .cluster_name = cluster_name
23582376
2377+ partitioner = local_row .get ("partitioner" )
2378+ tokens = local_row .get ("tokens" )
2379+
23592380 host = self ._cluster .metadata .get_host (connection .host )
23602381 if host :
23612382 datacenter = local_row .get ("data_center" )
@@ -2365,10 +2386,8 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
23652386 host .broadcast_address = local_row .get ("broadcast_address" )
23662387 host .release_version = local_row .get ("release_version" )
23672388
2368- partitioner = local_row .get ("partitioner" )
2369- tokens = local_row .get ("tokens" )
2370- if partitioner and tokens :
2371- token_map [host ] = tokens
2389+ if partitioner and tokens :
2390+ token_map [host ] = tokens
23722391
23732392 # Check metadata.partitioner to see if we haven't built anything yet. If
23742393 # every node in the cluster was in the contact points, we won't discover
@@ -2550,13 +2569,14 @@ def _get_schema_mismatches(self, peers_result, local_result, local_address):
25502569 if local_row .get ("schema_version" ):
25512570 versions [local_row .get ("schema_version" )].add (local_address )
25522571
2572+ lbp = self ._cluster .load_balancing_policy
25532573 for row in peers_result :
25542574 schema_ver = row .get ('schema_version' )
25552575 if not schema_ver :
25562576 continue
25572577 addr = self ._rpc_from_peer_row (row )
25582578 peer = self ._cluster .metadata .get_host (addr )
2559- if peer and peer .is_up :
2579+ if peer and peer .is_up and lbp . distance ( peer ) != HostDistance . IGNORED :
25602580 versions [schema_ver ].add (addr )
25612581
25622582 if len (versions ) == 1 :
@@ -2609,7 +2629,13 @@ def on_add(self, host, refresh_nodes=True):
26092629 self .refresh_node_list_and_token_map (force_token_rebuild = True )
26102630
26112631 def on_remove (self , host ):
2612- self .refresh_node_list_and_token_map (force_token_rebuild = True )
2632+ c = self ._connection
2633+ if c and c .host == host .address :
2634+ log .debug ("[control connection] Control connection host (%s) is being removed. Reconnecting" , host )
2635+ # refresh will be done on reconnect
2636+ self .reconnect ()
2637+ else :
2638+ self .refresh_node_list_and_token_map (force_token_rebuild = True )
26132639
26142640 def get_connections (self ):
26152641 c = getattr (self , '_connection' , None )
@@ -2630,7 +2656,7 @@ def _stop_scheduler(scheduler, thread):
26302656 thread .join ()
26312657
26322658
2633- class _Scheduler (object ):
2659+ class _Scheduler (Thread ):
26342660
26352661 _queue = None
26362662 _scheduled_tasks = None
@@ -2643,13 +2669,9 @@ def __init__(self, executor):
26432669 self ._count = count ()
26442670 self ._executor = executor
26452671
2646- t = Thread (target = self .run , name = "Task Scheduler" )
2647- t .daemon = True
2648- t .start ()
2649-
2650- # although this runs on a daemonized thread, we prefer to stop
2651- # it gracefully to avoid random errors during interpreter shutdown
2652- atexit .register (partial (_stop_scheduler , weakref .proxy (self ), t ))
2672+ Thread .__init__ (self , name = "Task Scheduler" )
2673+ self .daemon = True
2674+ self .start ()
26532675
26542676 def shutdown (self ):
26552677 try :
@@ -2659,6 +2681,7 @@ def shutdown(self):
26592681 pass
26602682 self .is_shutdown = True
26612683 self ._queue .put_nowait ((0 , 0 , None ))
2684+ self .join ()
26622685
26632686 def schedule (self , delay , fn , * args , ** kwargs ):
26642687 self ._insert_task (delay , (fn , args , tuple (kwargs .items ())))
@@ -2687,7 +2710,8 @@ def run(self):
26872710 while True :
26882711 run_at , i , task = self ._queue .get (block = True , timeout = None )
26892712 if self .is_shutdown :
2690- log .debug ("Not executing scheduled task due to Scheduler shutdown" )
2713+ if task :
2714+ log .debug ("Not executing scheduled task due to Scheduler shutdown" )
26912715 return
26922716 if run_at <= time .time ():
26932717 self ._scheduled_tasks .discard (task )
0 commit comments