Skip to content

Commit 7a831b5

Browse files
davibo-ocSylvain Lebresne
authored andcommitted
ReplicationStrategy fixes and tests
1 parent 425627a commit 7a831b5

6 files changed

Lines changed: 1134 additions & 23 deletions

File tree

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@ testing/
77
doc
88
notes
99
.DS_Store
10+
bin/
11+
test-output/

driver-core/src/main/java/com/datastax/driver/core/ReplicationStategy.java

Lines changed: 84 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
package com.datastax.driver.core;
1717

1818
import java.util.HashMap;
19+
import java.util.HashSet;
20+
import java.util.Iterator;
21+
import java.util.LinkedHashSet;
1922
import java.util.List;
2023
import java.util.Map;
2124
import java.util.Set;
@@ -32,14 +35,19 @@ abstract class ReplicationStrategy {
3235
static ReplicationStrategy create(Map<String, String> replicationOptions) {
3336

3437
String strategyClass = replicationOptions.get("class");
35-
String repFactorString = replicationOptions.get("replication_factor");
36-
if (strategyClass == null | repFactorString == null)
38+
if (strategyClass == null)
3739
return null;
3840

3941
try {
4042
if (strategyClass.contains("SimpleStrategy")) {
41-
return new SimpleStrategy(Integer.parseInt(repFactorString));
42-
} else if (strategyClass.contains("SimpleStrategy")) {
43+
//replication_factor is only specified for SimpleStrategy
44+
String repFactorString = replicationOptions.get("replication_factor");
45+
if (repFactorString == null) {
46+
return null;
47+
} else {
48+
return new SimpleStrategy(Integer.parseInt(repFactorString));
49+
}
50+
} else if (strategyClass.contains("NetworkTopologyStrategy")) {
4351
Map<String, Integer> dcRfs = new HashMap<String, Integer>();
4452
for (Map.Entry<String, String> entry : replicationOptions.entrySet())
4553
{
@@ -79,10 +87,12 @@ Map<Token, Set<Host>> computeTokenToReplicaMap(Map<Token, Host> tokenToPrimary,
7987

8088
Map<Token, Set<Host>> replicaMap = new HashMap<Token, Set<Host>>(tokenToPrimary.size());
8189
for (int i = 0; i < ring.size(); i++) {
82-
ImmutableSet.Builder<Host> builder = ImmutableSet.builder();
83-
for (int j = 0; j < rf; j++)
84-
builder.add(tokenToPrimary.get(getTokenWrapping(i+j, ring)));
85-
replicaMap.put(ring.get(i), builder.build());
90+
//handle consecutive sections in the ring assigned to the same host
91+
Set<Host> replicas = new LinkedHashSet<Host>();
92+
//we stop when reached the desired RF or ran out of nodes
93+
for (int j = 0; j < ring.size() && replicas.size() < rf; j++)
94+
replicas.add(tokenToPrimary.get(getTokenWrapping(i+j, ring)));
95+
replicaMap.put(ring.get(i), ImmutableSet.<Host>builder().addAll(replicas).build());
8696
}
8797
return replicaMap;
8898
}
@@ -98,36 +108,87 @@ private NetworkTopologyStrategy(Map<String, Integer> replicationFactors) {
98108

99109
Map<Token, Set<Host>> computeTokenToReplicaMap(Map<Token, Host> tokenToPrimary, List<Token> ring) {
100110

101-
Map<Token, Set<Host>> replicaMap = new HashMap<Token, Set<Host>>(tokenToPrimary.size());
111+
/*
112+
* This is essentially a copy of org.apache.cassandra.locator.NetworkTopologyStrategy
113+
*/
114+
115+
final Map<String, Set<String>> racks = getRacksInDcs(tokenToPrimary.values());
116+
final Map<Token, Set<Host>> replicaMap = new HashMap<Token, Set<Host>>(tokenToPrimary.size());
102117
for (int i = 0; i < ring.size(); i++) {
103-
Map<String, Integer> remainings = new HashMap<String, Integer>(replicationFactors);
104-
ImmutableSet.Builder<Host> builder = ImmutableSet.builder();
105-
for (int j = 0; j < ring.size(); j++) {
106-
Host h = tokenToPrimary.get(getTokenWrapping(j, ring));
118+
Map<String, Set<Host>> allDcReplicas = new HashMap<String, Set<Host>>();
119+
Map<String, Set<String>> seenRacks = new HashMap<String, Set<String>>();
120+
Map<String, Set<Host>> skippedDcEndpoints = new HashMap<String, Set<Host>>();
121+
for (String dc : replicationFactors.keySet()) {
122+
allDcReplicas.put(dc, new HashSet<Host>());
123+
seenRacks.put(dc, new HashSet<String>());
124+
//preserve order
125+
skippedDcEndpoints.put(dc, new LinkedHashSet<Host>());
126+
}
127+
128+
//preserve order - primary replica will be first
129+
Set<Host> replicas = new LinkedHashSet<Host>();
130+
//we stop the inner iteration if all DCs have enough replicas or we reach the end of the ring
131+
for (int j = 0; j < ring.size() && !allDone(allDcReplicas); j++) {
132+
Host h = tokenToPrimary.get(getTokenWrapping(i + j, ring));
107133
String dc = h.getDatacenter();
108134
if (dc == null)
109135
continue;
110136

111-
Integer remaining = remainings.get(dc);
112-
if (remaining <= 0)
137+
Integer rf = replicationFactors.get(dc);
138+
Set<Host> dcReplicas = allDcReplicas.get(dc);
139+
if (dcReplicas.size() >= rf)
113140
continue;
114141

115-
builder.add(h);
116-
remainings.put(dc, remaining - 1);
117-
if (allDone(remainings))
118-
break;
142+
//check if we already visited all racks in dc
143+
if (seenRacks.get(dc).size() == racks.get(dc).size()) {
144+
replicas.add(h);
145+
dcReplicas.add(h);
146+
} else {
147+
String rack = h.getRack();
148+
//is this a new rack?
149+
if (seenRacks.get(dc).contains(rack)) {
150+
skippedDcEndpoints.get(dc).add(h);
151+
} else {
152+
replicas.add(h);
153+
dcReplicas.add(h);
154+
seenRacks.get(dc).add(rack);
155+
//check if we have run out of all racks
156+
//if yes, add all those nodes that we skipped so far
157+
if (seenRacks.get(dc).size() == racks.get(dc).size()) {
158+
Iterator<Host> skippedIt = skippedDcEndpoints.get(dc).iterator();
159+
while (skippedIt.hasNext() && dcReplicas.size() < rf) {
160+
Host nextSkipped = skippedIt.next();
161+
replicas.add(nextSkipped);
162+
dcReplicas.add(nextSkipped);
163+
}
164+
}
165+
}
166+
}
119167
}
120-
replicaMap.put(ring.get(i), builder.build());
168+
replicaMap.put(ring.get(i), ImmutableSet.<Host>builder().addAll(replicas).build());
121169
}
122170
return replicaMap;
123171
}
124172

125-
private boolean allDone(Map<String, Integer> map)
173+
private boolean allDone(Map<String, Set<Host>> map)
126174
{
127-
for (Map.Entry<String, Integer> entry : map.entrySet())
128-
if (entry.getValue() > 0)
175+
for (Map.Entry<String, Set<Host>> entry : map.entrySet())
176+
if (entry.getValue().size() < replicationFactors.get(entry.getKey()))
129177
return false;
130178
return true;
131179
}
180+
181+
private Map<String, Set<String>> getRacksInDcs(Iterable<Host> hosts) {
182+
Map<String, Set<String>> result = new HashMap<String, Set<String>>();
183+
for (Host host : hosts) {
184+
Set<String> racks = result.get(host.getDatacenter());
185+
if (racks == null) {
186+
racks = new HashSet<String>();
187+
result.put(host.getDatacenter(), racks);
188+
}
189+
racks.add(host.getRack());
190+
}
191+
return result;
192+
}
132193
}
133194
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package com.datastax.driver.core;
2+
3+
import static org.testng.Assert.assertEquals;
4+
import static org.testng.Assert.assertNotNull;
5+
import static org.testng.Assert.assertTrue;
6+
7+
import java.net.InetAddress;
8+
import java.net.UnknownHostException;
9+
import java.util.Arrays;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Set;
13+
14+
import com.google.common.collect.ImmutableList;
15+
import com.google.common.collect.ImmutableList.Builder;
16+
17+
/**
18+
* Base class for replication strategy tests. Currently only supports testing
19+
* using the Murmur3Partitioner, which is the default anyway
20+
*
21+
* @author davibo
22+
*
23+
*/
24+
public class AbstractReplicationStrategyTest {
25+
26+
private static final Token.Factory partitioner = Token.getFactory("Murmur3Partitioner");
27+
28+
protected static class HostMock extends Host {
29+
private String address;
30+
31+
private HostMock(String address) throws UnknownHostException {
32+
super(InetAddress.getByName(address), new ConvictionPolicy.Simple.Factory());
33+
this.address = address;
34+
}
35+
36+
private HostMock(String address, String dc, String rack) throws UnknownHostException {
37+
this(address);
38+
this.setLocationInfo(dc, rack);
39+
}
40+
41+
@Override
42+
public String toString() {
43+
return address;
44+
}
45+
46+
public String getMockAddress() {
47+
return address;
48+
}
49+
}
50+
51+
protected static Token.Factory partitioner() {
52+
return partitioner;
53+
}
54+
55+
/**
56+
* Convenience method to quickly create a mock host by a given address.
57+
* Specified address must be accessible, otherwise a RuntimeException is thrown
58+
*
59+
* @param address
60+
* @return
61+
*/
62+
protected static HostMock host(String address) {
63+
try {
64+
return new HostMock(address);
65+
} catch (UnknownHostException ex) {
66+
throw new RuntimeException(ex); //wrap to avoid declarations
67+
}
68+
}
69+
70+
/**
71+
* Convenience method to quickly create a mock host by the given address
72+
* located in the given datacenter/rack
73+
*
74+
* @param address
75+
* @param dc
76+
* @param rack
77+
* @return
78+
*/
79+
protected static HostMock host(String address, String dc, String rack) {
80+
try {
81+
return new HostMock(address, dc, rack);
82+
} catch (UnknownHostException ex) {
83+
throw new RuntimeException(ex); //wrap to avoid declarations
84+
}
85+
}
86+
87+
/**
88+
* Convenience method to cast a Host object into a MockHost.
89+
* Returns null if parameter host is not a mock
90+
*
91+
* @param host
92+
* @return
93+
*/
94+
protected static HostMock asMock(Host host) {
95+
return (host instanceof HostMock ? (HostMock)host : null);
96+
}
97+
98+
/**
99+
* Convenience method to quickly retrieve a mock host's address as specified
100+
* if created by the <code>host(...)</code> methods. Returns null if
101+
* given host is not a mock.
102+
*
103+
* @param host
104+
* @return
105+
*/
106+
protected static String mockAddress(Host host) {
107+
HostMock mock = asMock(host);
108+
return mock == null ? null : mock.getMockAddress();
109+
}
110+
111+
protected static Token token(String value) {
112+
return partitioner.fromString(value);
113+
}
114+
115+
protected static List<Token> tokens(String... values) {
116+
Builder<Token> builder = ImmutableList.<Token>builder();
117+
for (String value : values) {
118+
builder.add(token(value));
119+
}
120+
return builder.build();
121+
}
122+
123+
/**
124+
* Asserts that the replica map for a given token contains the expected list of replica hosts.
125+
* Hosts are checked in order, replica placement should be an ordered set
126+
*
127+
* @param replicaMap
128+
* @param token
129+
* @param expected
130+
*/
131+
protected static void assertReplicaPlacement(Map<Token, Set<Host>> replicaMap, Token token, String... expected) {
132+
Set<Host> replicaSet = replicaMap.get(token);
133+
assertNotNull(replicaSet);
134+
assertReplicasForToken(replicaSet, expected);
135+
}
136+
137+
/**
138+
* Checks if a given ordered set of replicas matches the expected list of replica hosts
139+
*
140+
* @param replicaSet
141+
* @param expected
142+
*/
143+
protected static void assertReplicasForToken(Set<Host> replicaSet, String... expected) {
144+
final String message = "Contents of replica set: " + replicaSet + " do not match expected hosts: " + Arrays.toString(expected);
145+
assertEquals(replicaSet.size(), expected.length, message);
146+
147+
int i = 0;
148+
for (Host hostReturned : replicaSet) {
149+
boolean match = true;
150+
151+
if (!expected[i++].equals(mockAddress(hostReturned))) {
152+
match = false;
153+
}
154+
assertTrue(match, message);
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)