Commit ecd0376
committed
always add to a random replica
Tested with `ccm create test-endpoints -n 5 -v 3.11.1` and the script
below. Against the previous implementation resulted in
length of each key looking something like this:
{<Host: 127.0.0.1 datacenter1>: 60188,
<Host: 127.0.0.2 datacenter1>: 19878,
<Host: 127.0.0.3 datacenter1>: 19934}
The new implementation's results look more like this:
{<Host: 127.0.0.1 datacenter1>: 19866,
<Host: 127.0.0.2 datacenter1>: 20224,
<Host: 127.0.0.3 datacenter1>: 19869,
<Host: 127.0.0.4 datacenter1>: 20009,
<Host: 127.0.0.5 datacenter1>: 20032}
What happens in the test (5 nodes, RF=3) is this: group_keys_by_replica
checks C* keys and adds them, preferring hosts that are already there,
until there are 3 hosts in keys_per_host. Once those 3 hosts are chosen,
no new hosts will be added -- with RF=3 across 5 nodes, every key is
guaranteed to map to >= 1 of the 3 hosts already in the map.
The new implementation distributes values more evenly.
```
from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.metadata import group_keys_by_replica
import contextlib
import logging
import pprint
from random import randint
from textwrap import indent
import time
log = logging.getLogger(__name__)
log.setLevel('DEBUG')
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
log.addHandler(handler)
@contextlib.contextmanager
def debug_time(message):
log.debug('START: {}'.format(message))
start_time = time.time()
yield
elapsed = time.time() - start_time
log.debug(
'END ({elapsed:.2f}s): {message}'.format(
elapsed=elapsed, message=message
)
)
LOAD = True
MIN, MAX = -2 ** 31, (2 ** 31) - 1
c = Cluster()
s = c.connect()
s.execute("CREATE KEYSPACE IF NOT EXISTS test_ks "
"WITH replication = {"
" 'class': 'SimpleStrategy', "
" 'replication_factor': '3'"
"}")
s.execute("CREATE TABLE IF NOT EXISTS test_ks.test_tab ("
" partition int, "
" cluster int, "
" PRIMARY KEY (partition, cluster)"
")")
params = tuple(((randint(MIN, MAX), randint(MIN, MAX)) for _ in range(100000)))
p = s.prepare("INSERT INTO test_ks.test_tab (partition, cluster) "
"VALUES (?, ?)")
with debug_time('insertion'):
execute_concurrent_with_args(
session=s,
statement=p,
parameters=params
)
with debug_time('get replicas'):
gkbr = group_keys_by_replica(
s, "test_ks", "test_tab",
params
)
print('length of each key:')
print(indent(pprint.pformat({k: len(v) for k, v in gkbr.items()}), ' '))
print('example values per key:')
print(indent(pprint.pformat(({k: v[0:3] for k, v in gkbr.items()})), ' '))
```1 parent a30a095 commit ecd0376
1 file changed
Lines changed: 1 addition & 6 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
2611 | 2611 | | |
2612 | 2612 | | |
2613 | 2613 | | |
2614 | | - | |
2615 | | - | |
2616 | | - | |
2617 | | - | |
2618 | | - | |
2619 | | - | |
| 2614 | + | |
2620 | 2615 | | |
2621 | 2616 | | |
2622 | 2617 | | |
| |||
0 commit comments