|
18 | 18 | import com.datastax.driver.core.*; |
19 | 19 | import com.google.common.collect.AbstractIterator; |
20 | 20 | import com.google.common.collect.Lists; |
| 21 | +import org.slf4j.Logger; |
| 22 | +import org.slf4j.LoggerFactory; |
21 | 23 |
|
22 | 24 | import java.nio.ByteBuffer; |
23 | 25 | import java.util.*; |
|
46 | 48 | */ |
47 | 49 | public class TokenAwarePolicy implements ChainableLoadBalancingPolicy { |
48 | 50 |
|
| 51 | + private static final Logger LOG = LoggerFactory.getLogger(TokenAwarePolicy.class); |
| 52 | + private static final boolean POWER_OF_TWO_CHOICES = Boolean.getBoolean("com.datastax.driver.POWER_OF_TWO_CHOICES"); |
| 53 | + |
| 54 | + static { |
| 55 | + if (POWER_OF_TWO_CHOICES) { |
| 56 | + LOG.info("Activating power of two choices"); |
| 57 | + } |
| 58 | + } |
| 59 | + |
49 | 60 | private final LoadBalancingPolicy childPolicy; |
50 | 61 | private final boolean shuffleReplicas; |
| 62 | + private volatile Cluster cluster; |
51 | 63 | private volatile Metadata clusterMetadata; |
52 | 64 | private volatile ProtocolVersion protocolVersion; |
53 | 65 | private volatile CodecRegistry codecRegistry; |
@@ -87,6 +99,7 @@ public LoadBalancingPolicy getChildPolicy() { |
87 | 99 |
|
88 | 100 | @Override |
89 | 101 | public void init(Cluster cluster, Collection<Host> hosts) { |
| 102 | + this.cluster = cluster; |
90 | 103 | clusterMetadata = cluster.getMetadata(); |
91 | 104 | protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); |
92 | 105 | codecRegistry = cluster.getConfiguration().getCodecRegistry(); |
@@ -130,14 +143,7 @@ public Iterator<Host> newQueryPlan(final String loggedKeyspace, final Statement |
130 | 143 | if (replicas.isEmpty()) |
131 | 144 | return childPolicy.newQueryPlan(loggedKeyspace, statement); |
132 | 145 |
|
133 | | - final Iterator<Host> iter; |
134 | | - if (shuffleReplicas) { |
135 | | - List<Host> l = Lists.newArrayList(replicas); |
136 | | - Collections.shuffle(l); |
137 | | - iter = l.iterator(); |
138 | | - } else { |
139 | | - iter = replicas.iterator(); |
140 | | - } |
| 146 | + final Iterator<Host> iter = shuffle(replicas); |
141 | 147 |
|
142 | 148 | return new AbstractIterator<Host>() { |
143 | 149 |
|
@@ -165,6 +171,25 @@ protected Host computeNext() { |
165 | 171 | }; |
166 | 172 | } |
167 | 173 |
|
| 174 | + private Iterator<Host> shuffle(Set<Host> replicas) { |
| 175 | + if (shuffleReplicas) { |
| 176 | + List<Host> l = Lists.newArrayList(replicas); |
| 177 | + Collections.shuffle(l); |
| 178 | + if (POWER_OF_TWO_CHOICES && l.size() > 2) { |
| 179 | + // Order the first two hosts by increasing load |
| 180 | + Host host0 = l.get(0); |
| 181 | + Host host1 = l.get(1); |
| 182 | + if (cluster.inFlight(host0) > cluster.inFlight(host1)) { |
| 183 | + l.set(0, host1); |
| 184 | + l.set(1, host0); |
| 185 | + } |
| 186 | + } |
| 187 | + return l.iterator(); |
| 188 | + } else { |
| 189 | + return replicas.iterator(); |
| 190 | + } |
| 191 | + } |
| 192 | + |
168 | 193 | @Override |
169 | 194 | public void onUp(Host host) { |
170 | 195 | childPolicy.onUp(host); |
|
0 commit comments