Skip to content

Commit f008dbc

Browse files
committed
Prototype power of 2 choices algorithm for token-aware LBP
1 parent 7c93148 commit f008dbc

2 files changed

Lines changed: 47 additions & 8 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,20 @@ public Metrics getMetrics() {
420420
return manager.metrics;
421421
}
422422

423+
/**
424+
* TODO find a better way to expose this
425+
*/
426+
public int inFlight(Host host) {
427+
int inFlight = 0;
428+
for (SessionManager session : manager.sessions) {
429+
HostConnectionPool pool = session.pools.get(host);
430+
if (pool != null) {
431+
inFlight += pool.totalInFlight.get();
432+
}
433+
}
434+
return inFlight;
435+
}
436+
423437
/**
424438
* Registers the provided listener to be notified on hosts
425439
* up/down/added/removed events.

driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.datastax.driver.core.*;
1919
import com.google.common.collect.AbstractIterator;
2020
import com.google.common.collect.Lists;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
2123

2224
import java.nio.ByteBuffer;
2325
import java.util.*;
@@ -46,8 +48,18 @@
4648
*/
4749
public class TokenAwarePolicy implements ChainableLoadBalancingPolicy {
4850

51+
private static final Logger LOG = LoggerFactory.getLogger(TokenAwarePolicy.class);
52+
private static final boolean POWER_OF_TWO_CHOICES = Boolean.getBoolean("com.datastax.driver.POWER_OF_TWO_CHOICES");
53+
54+
static {
55+
if (POWER_OF_TWO_CHOICES) {
56+
LOG.info("Activating power of two choices");
57+
}
58+
}
59+
4960
private final LoadBalancingPolicy childPolicy;
5061
private final boolean shuffleReplicas;
62+
private volatile Cluster cluster;
5163
private volatile Metadata clusterMetadata;
5264
private volatile ProtocolVersion protocolVersion;
5365
private volatile CodecRegistry codecRegistry;
@@ -87,6 +99,7 @@ public LoadBalancingPolicy getChildPolicy() {
8799

88100
@Override
89101
public void init(Cluster cluster, Collection<Host> hosts) {
102+
this.cluster = cluster;
90103
clusterMetadata = cluster.getMetadata();
91104
protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
92105
codecRegistry = cluster.getConfiguration().getCodecRegistry();
@@ -130,14 +143,7 @@ public Iterator<Host> newQueryPlan(final String loggedKeyspace, final Statement
130143
if (replicas.isEmpty())
131144
return childPolicy.newQueryPlan(loggedKeyspace, statement);
132145

133-
final Iterator<Host> iter;
134-
if (shuffleReplicas) {
135-
List<Host> l = Lists.newArrayList(replicas);
136-
Collections.shuffle(l);
137-
iter = l.iterator();
138-
} else {
139-
iter = replicas.iterator();
140-
}
146+
final Iterator<Host> iter = shuffle(replicas);
141147

142148
return new AbstractIterator<Host>() {
143149

@@ -165,6 +171,25 @@ protected Host computeNext() {
165171
};
166172
}
167173

174+
private Iterator<Host> shuffle(Set<Host> replicas) {
175+
if (shuffleReplicas) {
176+
List<Host> l = Lists.newArrayList(replicas);
177+
Collections.shuffle(l);
178+
if (POWER_OF_TWO_CHOICES && l.size() > 2) {
179+
// Order the first two hosts by increasing load
180+
Host host0 = l.get(0);
181+
Host host1 = l.get(1);
182+
if (cluster.inFlight(host0) > cluster.inFlight(host1)) {
183+
l.set(0, host1);
184+
l.set(1, host0);
185+
}
186+
}
187+
return l.iterator();
188+
} else {
189+
return replicas.iterator();
190+
}
191+
}
192+
168193
@Override
169194
public void onUp(Host host) {
170195
childPolicy.onUp(host);

0 commit comments

Comments
 (0)