44
55import logging
66import time
7- from threading import Lock , RLock , Condition
7+ from threading import RLock , Condition
88import weakref
99try :
1010 from weakref import WeakSet
@@ -35,15 +35,18 @@ class Host(object):
3535 The IP address or hostname of the node.
3636 """
3737
38- monitor = None
38+ conviction_policy = None
3939 """
40- A : class:`.HealthMonitor ` instance that tracks whether this node is
41- up or down.
40+ A class:`ConvictionPolicy ` instance for determining when this node should
41+ be marked up or down.
4242 """
4343
44+ is_up = None
45+
4446 _datacenter = None
4547 _rack = None
4648 _reconnection_handler = None
49+ lock = None
4750
4851 def __init__ (self , inet_address , conviction_policy_factory ):
4952 if inet_address is None :
@@ -52,9 +55,8 @@ def __init__(self, inet_address, conviction_policy_factory):
5255 raise ValueError ("conviction_policy_factory may not be None" )
5356
5457 self .address = inet_address
55- self .monitor = HealthMonitor (conviction_policy_factory (self ))
56-
57- self ._reconnection_lock = Lock ()
58+ self .conviction_policy = conviction_policy_factory (self )
59+ self .lock = RLock ()
5860
5961 @property
6062 def datacenter (self ):
@@ -75,12 +77,25 @@ def set_location_info(self, datacenter, rack):
7577 self ._datacenter = datacenter
7678 self ._rack = rack
7779
80+ def set_up (self ):
81+ self .conviction_policy .reset ()
82+ self .is_up = True
83+
84+ def set_down (self ):
85+ self .is_up = False
86+
87+ def signal_connection_failure (self , connection_exc ):
88+ return self .conviction_policy .add_failure (connection_exc )
89+
90+ def is_currently_reconnecting (self ):
91+ return self ._reconnection_handler is not None
92+
7893 def get_and_set_reconnection_handler (self , new_handler ):
7994 """
8095 Atomically replaces the reconnection handler for this
8196 host. Intended for internal use only.
8297 """
83- with self ._reconnection_lock :
98+ with self .lock :
8499 old = self ._reconnection_handler
85100 self ._reconnection_handler = new_handler
86101 return old
@@ -175,94 +190,35 @@ def on_exception(self, exc, next_delay):
175190
176191class _HostReconnectionHandler (_ReconnectionHandler ):
177192
178- def __init__ (self , host , connection_factory , * args , ** kwargs ):
193+ def __init__ (self , host , connection_factory , is_host_addition , on_add , on_up , * args , ** kwargs ):
179194 _ReconnectionHandler .__init__ (self , * args , ** kwargs )
195+ self .is_host_addition = is_host_addition
196+ self .on_add = on_add
197+ self .on_up = on_up
180198 self .host = host
181199 self .connection_factory = connection_factory
182200
183201 def try_reconnect (self ):
184202 return self .connection_factory ()
185203
186204 def on_reconnection (self , connection ):
187- self .host .monitor .reset ()
205+ connection .close ()
206+ log .info ("Successful reconnection to %s, marking node up" , self .host )
207+ if self .is_host_addition :
208+ self .on_add (self .host )
209+ else :
210+ self .on_up (self .host )
188211
189212 def on_exception (self , exc , next_delay ):
190213 if isinstance (exc , AuthenticationFailed ):
191214 return False
192215 else :
193- log .warn ("Error attempting to reconnect to %s: %s" , self .host , exc )
216+ log .warn ("Error attempting to reconnect to %s, scheduling retry in %f seconds: %s" ,
217+ self .host , next_delay , exc )
194218 log .debug ("Reconnection error details" , exc_info = True )
195219 return True
196220
197221
198- class HealthMonitor (object ):
199- """
200- Monitors whether a particular host is marked as up or down.
201- This class is primarily intended for internal use, although
202- applications may find it useful to check whether a given node
203- is up or down.
204- """
205-
206- is_up = True
207- """
208- A boolean representing the current state of the node.
209- """
210-
211- def __init__ (self , conviction_policy ):
212- self ._conviction_policy = conviction_policy
213- self ._host = conviction_policy .host
214- # self._listeners will hold, among other things, references to
215- # Cluster objects. To allow those to be GC'ed (and shutdown) even
216- # though we've implemented __del__, use weak references.
217- self ._listeners = WeakSet ()
218- self ._lock = RLock ()
219-
220- def register (self , listener ):
221- with self ._lock :
222- self ._listeners .add (listener )
223-
224- def unregister (self , listener ):
225- with self ._lock :
226- self ._listeners .remove (listener )
227-
228- def set_up (self ):
229- if self .is_up :
230- return
231-
232- self ._conviction_policy .reset ()
233- log .info ("Host %s is considered up" , self ._host )
234-
235- with self ._lock :
236- listeners = self ._listeners .copy ()
237-
238- for listener in listeners :
239- listener .on_up (self ._host )
240-
241- self .is_up = True
242-
243- def set_down (self ):
244- if not self .is_up :
245- return
246-
247- self .is_up = False
248- log .info ("Host %s is considered down" , self ._host )
249-
250- with self ._lock :
251- listeners = self ._listeners .copy ()
252-
253- for listener in listeners :
254- listener .on_down (self ._host )
255-
256- def reset (self ):
257- return self .set_up ()
258-
259- def signal_connection_failure (self , connection_exc ):
260- is_down = self ._conviction_policy .add_failure (connection_exc )
261- if is_down :
262- self .set_down ()
263- return is_down
264-
265-
266222_MAX_SIMULTANEOUS_CREATION = 1
267223_NEW_CONNECTION_GRACE_PERIOD = 5
268224
@@ -295,6 +251,7 @@ def __init__(self, host, host_distance, session):
295251
296252 self ._trash = set ()
297253 self .open_count = core_conns
254+ log .debug ("Finished initializing new connection pool for host %s" , self .host )
298255
299256 def borrow_connection (self , timeout ):
300257 if self .is_shutdown :
@@ -395,7 +352,7 @@ def _add_conn_if_under_max(self):
395352 log .exception ("Failed to add new connection to pool for host %s" , self .host )
396353 with self ._lock :
397354 self .open_count -= 1
398- if self .host . monitor .signal_connection_failure (exc ):
355+ if self ._session . cluster .signal_connection_failure (self . host , exc , is_host_addition = False ):
399356 self .shutdown ()
400357 return False
401358 except AuthenticationFailed :
@@ -448,7 +405,8 @@ def return_connection(self, connection):
448405 if connection .is_defunct or connection .is_closed :
449406 log .debug ("Defunct or closed connection (%s) returned to pool, potentially "
450407 "marking host %s as down" , id (connection ), self .host )
451- is_down = self .host .monitor .signal_connection_failure (connection .last_error )
408+ is_down = self ._session .cluster .signal_connection_failure (
409+ self .host , connection .last_error , is_host_addition = False )
452410 if is_down :
453411 self .shutdown ()
454412 else :
0 commit comments