Skip to content

Commit 01015ce

Browse files
committed
Avoid adding hosts to load balancing policy without DC/rack info
1 parent a6a5935 commit 01015ce

6 files changed

Lines changed: 33 additions & 13 deletions

File tree

CHANGELOG.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ Bug Fixes
2121
if the callback was added outside of the event loop thread (github issue #95)
2222
* Properly escape keyspace name in Session.set_keyspace(). Previously, the
2323
keyspace name was quoted, but any quotes in the string were not escaped.
24+
* Avoid adding hosts to the load balancing policy before their datacenter
25+
and rack information has been set, if possible.
2426

2527
Other
2628
-----

cassandra/cluster.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -759,13 +759,13 @@ def signal_connection_failure(self, host, connection_exc, is_host_addition):
759759
self.on_down(host, is_host_addition, force_if_down=True)
760760
return is_down
761761

762-
def add_host(self, address, signal):
762+
def add_host(self, address, datacenter=None, rack=None, signal=True):
763763
"""
764764
Called when adding initial contact points and when the control
765765
connection subsequently discovers a new node. Intended for internal
766766
use only.
767767
"""
768-
new_host = self.metadata.add_host(address)
768+
new_host = self.metadata.add_host(address, datacenter, rack)
769769
if new_host and signal:
770770
log.info("New Cassandra host %s added", address)
771771
self.on_add(new_host)
@@ -1580,10 +1580,13 @@ def _refresh_node_list_and_token_map(self, connection):
15801580
found_hosts.add(addr)
15811581

15821582
host = self._cluster.metadata.get_host(addr)
1583+
datacenter = row.get("data_center")
1584+
rack = row.get("rack")
15831585
if host is None:
15841586
log.debug("[control connection] Found new host to connect to: %s", addr)
1585-
host = self._cluster.add_host(addr, signal=True)
1586-
host.set_location_info(row.get("data_center"), row.get("rack"))
1587+
host = self._cluster.add_host(addr, datacenter, rack, signal=True)
1588+
else:
1589+
self._update_location_info(host, datacenter, rack)
15871590

15881591
tokens = row.get("tokens")
15891592
if partitioner and tokens:
@@ -1600,11 +1603,22 @@ def _refresh_node_list_and_token_map(self, connection):
16001603
log.debug("[control connection] Fetched ring info, rebuilding metadata")
16011604
self._cluster.metadata.rebuild_token_map(partitioner, token_map)
16021605

1606+
def _update_location_info(self, host, datacenter, rack):
1607+
if host.datacenter == datacenter and host.rack == rack:
1608+
return
1609+
1610+
# If the dc/rack information changes, we need to update the load balancing policy.
1611+
# For that, we remove and re-add the node against the policy. Not the most elegant, and assumes
1612+
# that the policy will update correctly, but in practice this should work.
1613+
self._cluster.load_balancing_policy.on_down(host)
1614+
host.set_location_info(datacenter, rack)
1615+
self._cluster.load_balancing_policy.on_up(host)
1616+
16031617
def _handle_topology_change(self, event):
16041618
change_type = event["change_type"]
16051619
addr, port = event["address"]
16061620
if change_type == "NEW_NODE":
1607-
self._cluster.scheduler.schedule(10, self._cluster.add_host, addr, signal=True)
1621+
self._cluster.scheduler.schedule(10, self.refresh_node_list_and_token_map)
16081622
elif change_type == "REMOVED_NODE":
16091623
host = self._cluster.metadata.get_host(addr)
16101624
self._cluster.scheduler.schedule(0, self._cluster.remove_host, host)
@@ -1618,7 +1632,7 @@ def _handle_status_change(self, event):
16181632
if change_type == "UP":
16191633
if host is None:
16201634
# this is the first time we've seen the node
1621-
self._cluster.scheduler.schedule(1, self._cluster.add_host, addr, signal=True)
1635+
self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map)
16221636
else:
16231637
# this will be run by the scheduler
16241638
self._cluster.scheduler.schedule(1, self._cluster.on_up, host)

cassandra/metadata.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,11 +348,12 @@ def can_support_partitioner(self):
348348
else:
349349
return True
350350

351-
def add_host(self, address):
351+
def add_host(self, address, datacenter, rack):
352352
cluster = self.cluster_ref()
353353
with self._hosts_lock:
354354
if address not in self._hosts:
355-
new_host = Host(address, cluster.conviction_policy_factory)
355+
new_host = Host(
356+
address, cluster.conviction_policy_factory, datacenter, rack)
356357
self._hosts[address] = new_host
357358
else:
358359
return None

cassandra/policies.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ class RoundRobinPolicy(LoadBalancingPolicy):
134134
135135
This load balancing policy is used by default.
136136
"""
137+
_live_hosts = frozenset(())
137138

138139
def populate(self, cluster, hosts):
139140
self._live_hosts = frozenset(hosts)

cassandra/pool.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,15 @@ class Host(object):
5757
_currently_handling_node_up = False
5858
_handle_node_up_condition = None
5959

60-
def __init__(self, inet_address, conviction_policy_factory):
60+
def __init__(self, inet_address, conviction_policy_factory, datacenter=None, rack=None):
6161
if inet_address is None:
6262
raise ValueError("inet_address may not be None")
6363
if conviction_policy_factory is None:
6464
raise ValueError("conviction_policy_factory may not be None")
6565

6666
self.address = inet_address
6767
self.conviction_policy = conviction_policy_factory(self)
68+
self.set_location_info(datacenter, rack)
6869
self.lock = RLock()
6970
self._handle_node_up_condition = Condition()
7071

tests/unit/test_control_connection.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ def __init__(self):
5959
self.scheduler = Mock(spec=_Scheduler)
6060
self.executor = Mock(spec=ThreadPoolExecutor)
6161

62-
def add_host(self, address, signal=False):
63-
host = Host(address, SimpleConvictionPolicy)
62+
def add_host(self, address, datacenter, rack, signal=False):
63+
host = Host(address, SimpleConvictionPolicy, datacenter, rack)
6464
self.added_hosts.append(host)
6565
return host
6666

@@ -212,6 +212,7 @@ def test_refresh_nodes_and_tokens_add_host(self):
212212
self.connection.peer_results[1].append(
213213
["192.168.1.3", "10.0.0.3", "a", "dc1", "rack1", ["3", "103", "203"]]
214214
)
215+
self.cluster.scheduler.schedule = lambda delay, f, *args, **kwargs: f(*args, **kwargs)
215216
self.control_connection.refresh_node_list_and_token_map()
216217
self.assertEqual(1, len(self.cluster.added_hosts))
217218
self.assertEqual(self.cluster.added_hosts[0].address, "192.168.1.3")
@@ -250,7 +251,7 @@ def test_handle_topology_change(self):
250251
'address': ('1.2.3.4', 9000)
251252
}
252253
self.control_connection._handle_topology_change(event)
253-
self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.add_host, '1.2.3.4', signal=True)
254+
self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map)
254255

255256
event = {
256257
'change_type': 'REMOVED_NODE',
@@ -272,7 +273,7 @@ def test_handle_status_change(self):
272273
'address': ('1.2.3.4', 9000)
273274
}
274275
self.control_connection._handle_status_change(event)
275-
self.cluster.scheduler.schedule.assert_called_with(ANY, self.cluster.add_host, '1.2.3.4', signal=True)
276+
self.cluster.scheduler.schedule.assert_called_with(ANY, self.control_connection.refresh_node_list_and_token_map)
276277

277278
# do the same with a known Host
278279
event = {

0 commit comments

Comments
 (0)