Skip to content

Commit 5777c20

Browse files
committed
Attempt to process correlated events the order in which the arrive
Events are mostly processed in order. "mostly" because there is still a race by way of the cluster thread pool executor. Meta class should be robust against this.
1 parent e9719ae commit 5777c20

3 files changed

Lines changed: 28 additions & 17 deletions

File tree

cassandra/cluster.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2065,6 +2065,8 @@ def __init__(self, cluster, timeout,
20652065
self._reconnection_handler = None
20662066
self._reconnection_lock = RLock()
20672067

2068+
self._event_schedule_times = {}
2069+
20682070
def connect(self):
20692071
if self._is_shutdown:
20702072
return
@@ -2501,12 +2503,26 @@ def _update_location_info(self, host, datacenter, rack):
25012503
self._cluster.load_balancing_policy.on_up(host)
25022504
return True
25032505

2506+
def _delay_for_event_type(self, event_type, delay_window):
2507+
# this serves to order processing correlated events (received within the window)
2508+
# the window and randomization still have the desired effect of skew across client instances
2509+
next_time = self._event_schedule_times.get(event_type, 0)
2510+
now = self._time.time()
2511+
if now <= next_time:
2512+
this_time = next_time + 0.01
2513+
delay = this_time - now
2514+
else:
2515+
delay = random() * delay_window
2516+
this_time = now + delay
2517+
self._event_schedule_times[event_type] = this_time
2518+
return delay
2519+
25042520
def _handle_topology_change(self, event):
25052521
change_type = event["change_type"]
25062522
addr, port = event["address"]
25072523
if change_type == "NEW_NODE" or change_type == "MOVED_NODE":
25082524
if self._topology_event_refresh_window >= 0:
2509-
delay = random() * self._topology_event_refresh_window
2525+
delay = self._delay_for_event_type('topology_change', self._topology_event_refresh_window)
25102526
self._cluster.scheduler.schedule_unique(delay, self.refresh_node_list_and_token_map)
25112527
elif change_type == "REMOVED_NODE":
25122528
host = self._cluster.metadata.get_host(addr)
@@ -2517,7 +2533,7 @@ def _handle_status_change(self, event):
25172533
addr, port = event["address"]
25182534
host = self._cluster.metadata.get_host(addr)
25192535
if change_type == "UP":
2520-
delay = 1 + random() * 0.5 # randomness to avoid thundering herd problem on events
2536+
delay = 1 + self._delay_for_event_type('status_change', 0.5) # randomness to avoid thundering herd problem on events
25212537
if host is None:
25222538
# this is the first time we've seen the node
25232539
self._cluster.scheduler.schedule_unique(delay, self.refresh_node_list_and_token_map)
@@ -2540,7 +2556,7 @@ def _handle_schema_change(self, event):
25402556
usertype = event.get('type')
25412557
function = event.get('function')
25422558
aggregate = event.get('aggregate')
2543-
delay = random() * self._schema_event_refresh_window
2559+
delay = self._delay_for_event_type('schema_change', self._schema_event_refresh_window)
25442560
self._cluster.scheduler.schedule_unique(delay, self.refresh_schema, keyspace, table, usertype, function, aggregate)
25452561

25462562
def wait_for_schema_agreement(self, connection=None, preloaded_results=None, wait_time=None):

cassandra/metadata.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -236,21 +236,16 @@ def aggregate_changed(self, keyspace, aggregate, aggregate_results):
236236
pass
237237

238238
def table_changed(self, keyspace, table, cf_results, col_results, triggers_result):
239-
try:
240-
keyspace_meta = self.keyspaces[keyspace]
241-
except KeyError:
242-
# we're trying to update a table in a keyspace we don't know about
243-
log.warn("Tried to update schema for table '%s' in unknown keyspace '%s'",
244-
table, keyspace)
245-
return
246-
247-
if not cf_results:
248-
# the table was removed
249-
keyspace_meta._drop_table_metadata(table)
250-
else:
239+
if cf_results:
251240
assert len(cf_results) == 1
241+
keyspace_meta = self.keyspaces[keyspace]
252242
table_meta = self._build_table_metadata(keyspace_meta, cf_results[0], {table: col_results}, {table: triggers_result})
253243
keyspace_meta._add_table_metadata(table_meta)
244+
else:
245+
try:
246+
self.keyspaces[keyspace]._drop_table_metadata(table)
247+
except KeyError:
248+
pass
254249

255250
def _keyspace_added(self, ksname):
256251
if self.token_map:

tests/unit/test_control_connection.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,12 +410,12 @@ def test_handle_schema_change(self):
410410
}
411411
self.cluster.scheduler.reset_mock()
412412
self.control_connection._handle_schema_change(event)
413-
self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', 'table1', None, None, None)
413+
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, 'ks1', 'table1', None, None, None)
414414

415415
self.cluster.scheduler.reset_mock()
416416
event['table'] = None
417417
self.control_connection._handle_schema_change(event)
418-
self.cluster.scheduler.schedule_unique.assert_called_once_with(0.0, self.control_connection.refresh_schema, 'ks1', None, None, None, None)
418+
self.cluster.scheduler.schedule_unique.assert_called_once_with(ANY, self.control_connection.refresh_schema, 'ks1', None, None, None, None)
419419

420420
def test_refresh_disabled(self):
421421
cluster = MockCluster()

0 commit comments

Comments
 (0)