Skip to content

Commit ecd0376

Browse files
committed
always add to a random replica
Tested with `ccm create test-endpoints -n 5 -v 3.11.1` and the script below. Against the previous implementation resulted in length of each key looking something like this: {<Host: 127.0.0.1 datacenter1>: 60188, <Host: 127.0.0.2 datacenter1>: 19878, <Host: 127.0.0.3 datacenter1>: 19934} The new implementation's results look more like this: {<Host: 127.0.0.1 datacenter1>: 19866, <Host: 127.0.0.2 datacenter1>: 20224, <Host: 127.0.0.3 datacenter1>: 19869, <Host: 127.0.0.4 datacenter1>: 20009, <Host: 127.0.0.5 datacenter1>: 20032} What happens in the test (5 nodes, RF=3) is this: group_keys_by_replica checks C* keys and adds them, preferring hosts that are already there, until there are 3 hosts in keys_per_host. Once those 3 hosts are chosen, no new hosts will be added -- with RF=3 across 5 nodes, every key is guaranteed to map to >= 1 of the 3 hosts already in the map. The new implementation distributes values more evenly. ``` from cassandra.cluster import Cluster from cassandra.concurrent import execute_concurrent_with_args from cassandra.metadata import group_keys_by_replica import contextlib import logging import pprint from random import randint from textwrap import indent import time log = logging.getLogger(__name__) log.setLevel('DEBUG') handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) log.addHandler(handler) @contextlib.contextmanager def debug_time(message): log.debug('START: {}'.format(message)) start_time = time.time() yield elapsed = time.time() - start_time log.debug( 'END ({elapsed:.2f}s): {message}'.format( elapsed=elapsed, message=message ) ) LOAD = True MIN, MAX = -2 ** 31, (2 ** 31) - 1 c = Cluster() s = c.connect() s.execute("CREATE KEYSPACE IF NOT EXISTS test_ks " "WITH replication = {" " 'class': 'SimpleStrategy', " " 'replication_factor': '3'" "}") s.execute("CREATE TABLE IF NOT EXISTS test_ks.test_tab (" " partition int, " " cluster int, " " PRIMARY KEY (partition, cluster)" ")") params = tuple(((randint(MIN, MAX), randint(MIN, MAX)) for _ in range(100000))) p = s.prepare("INSERT INTO test_ks.test_tab (partition, cluster) " "VALUES (?, ?)") with debug_time('insertion'): execute_concurrent_with_args( session=s, statement=p, parameters=params ) with debug_time('get replicas'): gkbr = group_keys_by_replica( s, "test_ks", "test_tab", params ) print('length of each key:') print(indent(pprint.pformat({k: len(v) for k, v in gkbr.items()}), ' ')) print('example values per key:') print(indent(pprint.pformat(({k: v[0:3] for k, v in gkbr.items()})), ' ')) ```
1 parent a30a095 commit ecd0376

1 file changed

Lines changed: 1 addition & 6 deletions

File tree

cassandra/metadata.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2611,12 +2611,7 @@ def group_keys_by_replica(session, keyspace, table, keys):
26112611
valid_replicas = [host for host in all_replicas if host.is_up]
26122612

26132613
if valid_replicas:
2614-
for replica in valid_replicas:
2615-
if replica in keys_per_host:
2616-
keys_per_host[replica].append(key)
2617-
break
2618-
else:
2619-
keys_per_host[random.choice(valid_replicas)].append(key)
2614+
keys_per_host[random.choice(valid_replicas)].append(key)
26202615
else:
26212616
# We will group under this statement all the keys for which
26222617
# we haven't found a valid replica

0 commit comments

Comments
 (0)