Skip to content

Commit 94d4d6f

Browse files
committed
Merge pull request apache#438 from datastax/378
PYTHON-378 - make NetworkTopologyStrategy consider rack placement
2 parents e76c142 + 699801a commit 94d4d6f

2 files changed

Lines changed: 69 additions & 15 deletions

File tree

cassandra/metadata.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -473,30 +473,29 @@ def __init__(self, dc_replication_factors):
473473
def make_token_replica_map(self, token_to_host_owner, ring):
474474
# note: this does not account for hosts having different racks
475475
replica_map = defaultdict(list)
476-
ring_len = len(ring)
477-
ring_len_range = range(ring_len)
478476
dc_rf_map = dict((dc, int(rf))
479477
for dc, rf in self.dc_replication_factors.items() if rf > 0)
480-
dcs = dict((h, h.datacenter) for h in set(token_to_host_owner.values()))
481478

482479
# build a map of DCs to lists of indexes into `ring` for tokens that
483480
# belong to that DC
484481
dc_to_token_offset = defaultdict(list)
482+
dc_racks = defaultdict(set)
485483
for i, token in enumerate(ring):
486484
host = token_to_host_owner[token]
487-
dc_to_token_offset[dcs[host]].append(i)
485+
dc_to_token_offset[host.datacenter].append(i)
486+
if host.datacenter and host.rack:
487+
dc_racks[host.datacenter].add(host.rack)
488488

489489
# A map of DCs to an index into the dc_to_token_offset value for that dc.
490490
# This is how we keep track of advancing around the ring for each DC.
491491
dc_to_current_index = defaultdict(int)
492492

493-
for i in ring_len_range:
494-
remaining = dc_rf_map.copy()
493+
for i in range(len(ring)):
495494
replicas = replica_map[ring[i]]
496495

497496
# go through each DC and find the replicas in that DC
498497
for dc in dc_to_token_offset.keys():
499-
if dc not in remaining:
498+
if dc not in dc_rf_map:
500499
continue
501500

502501
# advance our per-DC index until we're up to at least the
@@ -508,20 +507,33 @@ def make_token_replica_map(self, token_to_host_owner, ring):
508507
index += 1
509508
dc_to_current_index[dc] = index
510509

511-
# now add the next RF distinct token owners to the set of
512-
# replicas for this DC
510+
replicas_remaining = dc_rf_map[dc]
511+
skipped_hosts = []
512+
racks_placed = set()
513+
racks_this_dc = dc_racks[dc]
513514
for token_offset in islice(cycle(token_offsets), index, index + num_tokens):
514515
host = token_to_host_owner[ring[token_offset]]
516+
if replicas_remaining == 0:
517+
break
518+
515519
if host in replicas:
516520
continue
517521

522+
if host.rack in racks_placed and len(racks_placed) < len(racks_this_dc):
523+
skipped_hosts.append(host)
524+
continue
525+
518526
replicas.append(host)
519-
dc_remaining = remaining[dc] - 1
520-
if dc_remaining == 0:
521-
del remaining[dc]
522-
break
523-
else:
524-
remaining[dc] = dc_remaining
527+
replicas_remaining -= 1
528+
racks_placed.add(host.rack)
529+
530+
if len(racks_placed) == len(racks_this_dc):
531+
for host in skipped_hosts:
532+
if replicas_remaining == 0:
533+
break
534+
replicas.append(host)
535+
replicas_remaining -= 1
536+
del skipped_hosts[:]
525537

526538
return replica_map
527539

tests/unit/test_metadata.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,48 @@ def test_nts_make_token_replica_map(self):
111111

112112
self.assertItemsEqual(replica_map[MD5Token(0)], (dc1_1, dc1_2, dc2_1, dc2_2, dc3_1))
113113

114+
def test_nts_make_token_replica_map_multi_rack(self):
115+
token_to_host_owner = {}
116+
117+
# (A) not enough distinct racks, first skipped is used
118+
dc1_1 = Host('dc1.1', SimpleConvictionPolicy)
119+
dc1_2 = Host('dc1.2', SimpleConvictionPolicy)
120+
dc1_3 = Host('dc1.3', SimpleConvictionPolicy)
121+
dc1_4 = Host('dc1.4', SimpleConvictionPolicy)
122+
dc1_1.set_location_info('dc1', 'rack1')
123+
dc1_2.set_location_info('dc1', 'rack1')
124+
dc1_3.set_location_info('dc1', 'rack2')
125+
dc1_4.set_location_info('dc1', 'rack2')
126+
token_to_host_owner[MD5Token(0)] = dc1_1
127+
token_to_host_owner[MD5Token(100)] = dc1_2
128+
token_to_host_owner[MD5Token(200)] = dc1_3
129+
token_to_host_owner[MD5Token(300)] = dc1_4
130+
131+
# (B) distinct racks, but not contiguous
132+
dc2_1 = Host('dc2.1', SimpleConvictionPolicy)
133+
dc2_2 = Host('dc2.2', SimpleConvictionPolicy)
134+
dc2_3 = Host('dc2.3', SimpleConvictionPolicy)
135+
dc2_1.set_location_info('dc2', 'rack1')
136+
dc2_2.set_location_info('dc2', 'rack1')
137+
dc2_3.set_location_info('dc2', 'rack2')
138+
token_to_host_owner[MD5Token(1)] = dc2_1
139+
token_to_host_owner[MD5Token(101)] = dc2_2
140+
token_to_host_owner[MD5Token(201)] = dc2_3
141+
142+
ring = [MD5Token(0),
143+
MD5Token(1),
144+
MD5Token(100),
145+
MD5Token(101),
146+
MD5Token(200),
147+
MD5Token(201),
148+
MD5Token(300)]
149+
150+
nts = NetworkTopologyStrategy({'dc1': 3, 'dc2': 2})
151+
replica_map = nts.make_token_replica_map(token_to_host_owner, ring)
152+
153+
token_replicas = replica_map[MD5Token(0)]
154+
self.assertItemsEqual(token_replicas, (dc1_1, dc1_2, dc1_3, dc2_1, dc2_3))
155+
114156
def test_nts_make_token_replica_map_empty_dc(self):
115157
host = Host('1', SimpleConvictionPolicy)
116158
host.set_location_info('dc1', 'rack1')

0 commit comments

Comments
 (0)