@@ -2065,6 +2065,8 @@ def __init__(self, cluster, timeout,
20652065 self ._reconnection_handler = None
20662066 self ._reconnection_lock = RLock ()
20672067
2068+ self ._event_schedule_times = {}
2069+
20682070 def connect (self ):
20692071 if self ._is_shutdown :
20702072 return
@@ -2501,12 +2503,26 @@ def _update_location_info(self, host, datacenter, rack):
25012503 self ._cluster .load_balancing_policy .on_up (host )
25022504 return True
25032505
2506+ def _delay_for_event_type (self , event_type , delay_window ):
2507+ # this serves to order processing correlated events (received within the window)
2508+ # the window and randomization still have the desired effect of skew across client instances
2509+ next_time = self ._event_schedule_times .get (event_type , 0 )
2510+ now = self ._time .time ()
2511+ if now <= next_time :
2512+ this_time = next_time + 0.01
2513+ delay = this_time - now
2514+ else :
2515+ delay = random () * delay_window
2516+ this_time = now + delay
2517+ self ._event_schedule_times [event_type ] = this_time
2518+ return delay
2519+
25042520 def _handle_topology_change (self , event ):
25052521 change_type = event ["change_type" ]
25062522 addr , port = event ["address" ]
25072523 if change_type == "NEW_NODE" or change_type == "MOVED_NODE" :
25082524 if self ._topology_event_refresh_window >= 0 :
2509- delay = random () * self ._topology_event_refresh_window
2525+ delay = self . _delay_for_event_type ( 'topology_change' , self ._topology_event_refresh_window )
25102526 self ._cluster .scheduler .schedule_unique (delay , self .refresh_node_list_and_token_map )
25112527 elif change_type == "REMOVED_NODE" :
25122528 host = self ._cluster .metadata .get_host (addr )
@@ -2517,7 +2533,7 @@ def _handle_status_change(self, event):
25172533 addr , port = event ["address" ]
25182534 host = self ._cluster .metadata .get_host (addr )
25192535 if change_type == "UP" :
2520- delay = 1 + random () * 0.5 # randomness to avoid thundering herd problem on events
2536+ delay = 1 + self . _delay_for_event_type ( 'status_change' , 0.5 ) # randomness to avoid thundering herd problem on events
25212537 if host is None :
25222538 # this is the first time we've seen the node
25232539 self ._cluster .scheduler .schedule_unique (delay , self .refresh_node_list_and_token_map )
@@ -2540,7 +2556,7 @@ def _handle_schema_change(self, event):
25402556 usertype = event .get ('type' )
25412557 function = event .get ('function' )
25422558 aggregate = event .get ('aggregate' )
2543- delay = random () * self ._schema_event_refresh_window
2559+ delay = self . _delay_for_event_type ( 'schema_change' , self ._schema_event_refresh_window )
25442560 self ._cluster .scheduler .schedule_unique (delay , self .refresh_schema , keyspace , table , usertype , function , aggregate )
25452561
25462562 def wait_for_schema_agreement (self , connection = None , preloaded_results = None , wait_time = None ):
0 commit comments