Skip to content

Commit d0ee3bc

Browse files
committed
Make DCAware LBP tolerate DC changes during query plan
Fixes a RuntimeError that would raise if the DCAwareRoundRobinPolicy DC:host map changed during generation. PYTHON-297
1 parent 56be4cb commit d0ee3bc

2 files changed

Lines changed: 159 additions & 5 deletions

File tree

cassandra/policies.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,11 +267,11 @@ def make_query_plan(self, working_keyspace=None, query=None):
267267
for host in islice(cycle(local_live), pos, pos + len(local_live)):
268268
yield host
269269

270-
for dc, current_dc_hosts in six.iteritems(self._dc_live_hosts):
271-
if dc == self.local_dc:
272-
continue
273-
274-
for host in current_dc_hosts[:self.used_hosts_per_remote_dc]:
270+
# the dict can change, so get candidate DCs iterating over keys of a copy
271+
other_dcs = [dc for dc in self._dc_live_hosts.copy().keys() if dc != self.local_dc]
272+
for dc in other_dcs:
273+
remote_live = self._dc_live_hosts.get(dc, ())
274+
for host in remote_live[:self.used_hosts_per_remote_dc]:
275275
yield host
276276

277277
def on_up(self, host):

tests/unit/test_policies.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,160 @@ def test_status_updates(self):
292292
qplan = list(policy.make_query_plan())
293293
self.assertEqual(qplan, [])
294294

295+
def test_modification_during_generation(self):
296+
hosts = [Host(i, SimpleConvictionPolicy) for i in range(4)]
297+
for h in hosts[:2]:
298+
h.set_location_info("dc1", "rack1")
299+
for h in hosts[2:]:
300+
h.set_location_info("dc2", "rack1")
301+
302+
policy = DCAwareRoundRobinPolicy("dc1", used_hosts_per_remote_dc=3)
303+
policy.populate(Mock(), hosts)
304+
305+
# The general concept here is to change thee internal state of the
306+
# policy during plan generation. In this case we use a grey-box
307+
# approach that changes specific things during known phases of the
308+
# generator.
309+
310+
new_host = Host(4, SimpleConvictionPolicy)
311+
new_host.set_location_info("dc1", "rack1")
312+
313+
# new local before iteration
314+
plan = policy.make_query_plan()
315+
policy.on_up(new_host)
316+
# local list is not bound yet, so we get to see that one
317+
self.assertEqual(len(list(plan)), 3 + 2)
318+
319+
# remove local before iteration
320+
plan = policy.make_query_plan()
321+
policy.on_down(new_host)
322+
# local list is not bound yet, so we don't see it
323+
self.assertEqual(len(list(plan)), 2 + 2)
324+
325+
# new local after starting iteration
326+
plan = policy.make_query_plan()
327+
next(plan)
328+
policy.on_up(new_host)
329+
# local list was is bound, and one consumed, so we only see the other original
330+
self.assertEqual(len(list(plan)), 1 + 2)
331+
332+
# remove local after traversing available
333+
plan = policy.make_query_plan()
334+
for _ in range(3):
335+
next(plan)
336+
policy.on_down(new_host)
337+
# we should be past the local list
338+
self.assertEqual(len(list(plan)), 0 + 2)
339+
340+
# REMOTES CHANGE
341+
new_host.set_location_info("dc2", "rack1")
342+
343+
# new remote after traversing local, but not starting remote
344+
plan = policy.make_query_plan()
345+
for _ in range(2):
346+
next(plan)
347+
policy.on_up(new_host)
348+
# list is updated before we get to it
349+
self.assertEqual(len(list(plan)), 0 + 3)
350+
351+
# remove remote after traversing local, but not starting remote
352+
plan = policy.make_query_plan()
353+
for _ in range(2):
354+
next(plan)
355+
policy.on_down(new_host)
356+
# list is updated before we get to it
357+
self.assertEqual(len(list(plan)), 0 + 2)
358+
359+
# new remote after traversing local, and starting remote
360+
plan = policy.make_query_plan()
361+
for _ in range(3):
362+
next(plan)
363+
policy.on_up(new_host)
364+
# slice is already made, and we've consumed one
365+
self.assertEqual(len(list(plan)), 0 + 1)
366+
367+
# remove remote after traversing local, and starting remote
368+
plan = policy.make_query_plan()
369+
for _ in range(3):
370+
next(plan)
371+
policy.on_down(new_host)
372+
# slice is created with all present, and we've consumed one
373+
self.assertEqual(len(list(plan)), 0 + 2)
374+
375+
# local DC disappears after finishing it, but not starting remote
376+
plan = policy.make_query_plan()
377+
for _ in range(2):
378+
next(plan)
379+
policy.on_down(hosts[0])
380+
policy.on_down(hosts[1])
381+
# dict traversal starts as normal
382+
self.assertEqual(len(list(plan)), 0 + 2)
383+
policy.on_up(hosts[0])
384+
policy.on_up(hosts[1])
385+
386+
# PYTHON-297 addresses the following cases, where DCs come and go
387+
# during generation
388+
# local DC disappears after finishing it, and starting remote
389+
plan = policy.make_query_plan()
390+
for _ in range(3):
391+
next(plan)
392+
policy.on_down(hosts[0])
393+
policy.on_down(hosts[1])
394+
# dict traversal has begun and consumed one
395+
self.assertEqual(len(list(plan)), 0 + 1)
396+
policy.on_up(hosts[0])
397+
policy.on_up(hosts[1])
398+
399+
# remote DC disappears after finishing local, but not starting remote
400+
plan = policy.make_query_plan()
401+
for _ in range(2):
402+
next(plan)
403+
policy.on_down(hosts[2])
404+
policy.on_down(hosts[3])
405+
# nothing left
406+
self.assertEqual(len(list(plan)), 0 + 0)
407+
policy.on_up(hosts[2])
408+
policy.on_up(hosts[3])
409+
410+
# remote DC disappears while traversing it
411+
plan = policy.make_query_plan()
412+
for _ in range(3):
413+
next(plan)
414+
policy.on_down(hosts[2])
415+
policy.on_down(hosts[3])
416+
# we continue with remainder of original list
417+
self.assertEqual(len(list(plan)), 0 + 1)
418+
policy.on_up(hosts[2])
419+
policy.on_up(hosts[3])
420+
421+
422+
another_host = Host(5, SimpleConvictionPolicy)
423+
another_host.set_location_info("dc3", "rack1")
424+
new_host.set_location_info("dc3", "rack1")
425+
426+
# new DC while traversing remote
427+
plan = policy.make_query_plan()
428+
for _ in range(3):
429+
next(plan)
430+
policy.on_up(new_host)
431+
policy.on_up(another_host)
432+
# we continue with remainder of original list
433+
self.assertEqual(len(list(plan)), 0 + 1)
434+
435+
# remote DC disappears after finishing it
436+
plan = policy.make_query_plan()
437+
for _ in range(3):
438+
next(plan)
439+
last_host_in_this_dc = next(plan)
440+
if last_host_in_this_dc in (new_host, another_host):
441+
down_hosts = [new_host, another_host]
442+
else:
443+
down_hosts = hosts[2:]
444+
for h in down_hosts:
445+
policy.on_down(h)
446+
# the last DC has two
447+
self.assertEqual(len(list(plan)), 0 + 2)
448+
295449
def test_no_live_nodes(self):
296450
"""
297451
Ensure query plan for a downed cluster will execute without errors

0 commit comments

Comments
 (0)