Skip to content

Commit df467c4

Browse files
author
Alexandre Dutra
committed
JAVA-1448: TokenAwarePolicy should respect child policy ordering
1 parent cc6ec29 commit df467c4

4 files changed

Lines changed: 142 additions & 235 deletions

File tree

changelog/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
## Changelog
22

3+
### 3.3.0 (in progress)
4+
5+
- [improvement] JAVA-1448: TokenAwarePolicy should respect child policy ordering.
6+
7+
38
### 3.2.0
49

510
- [new feature] JAVA-1347: Add support for duration type.

driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java

Lines changed: 48 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,38 @@
1616
package com.datastax.driver.core.policies;
1717

1818
import com.datastax.driver.core.*;
19-
import com.google.common.collect.AbstractIterator;
20-
import com.google.common.collect.Lists;
2119

2220
import java.nio.ByteBuffer;
2321
import java.util.*;
2422

2523
/**
26-
* A wrapper load balancing policy that add token awareness to a child policy.
24+
* A wrapper load balancing policy that adds token awareness to a child policy.
2725
* <p/>
2826
* This policy encapsulates another policy. The resulting policy works in
2927
* the following way:
3028
* <ul>
3129
* <li>the {@code distance} method is inherited from the child policy.</li>
32-
* <li>the iterator return by the {@code newQueryPlan} method will first
33-
* return the {@code LOCAL} replicas for the query (based on {@link Statement#getRoutingKey})
34-
* <i>if possible</i> (i.e. if the query {@code getRoutingKey} method
35-
* doesn't return {@code null} and if {@link Metadata#getReplicas}
36-
* returns a non empty set of replicas for that partition key). If no
37-
* local replica can be either found or successfully contacted, the rest
38-
* of the query plan will fallback to one of the child policy.</li>
30+
* <li>the iterator returned by the {@code newQueryPlan} method will first
31+
* return the {@link HostDistance#LOCAL LOCAL} replicas for the query
32+
* <em>if possible</em> (i.e. if the query's
33+
* {@link Statement#getRoutingKey(ProtocolVersion, CodecRegistry) routing key}
34+
* is not {@code null} and if the
35+
* {@link Metadata#getReplicas(String, ByteBuffer) set of replicas}
36+
* for that partition key is not empty). If no local replica can be either found
37+
* or successfully contacted, the rest of the query plan will fallback
38+
* to the child policy's one.</li>
3939
* </ul>
4040
* <p/>
41-
* Do note that only replica for which the child policy {@code distance}
42-
* method returns {@code HostDistance.LOCAL} will be considered having
41+
* Do note that only replicas for which the child policy's
42+
* {@link LoadBalancingPolicy#distance(Host) distance}
43+
* method returns {@link HostDistance#LOCAL LOCAL} will be considered having
4344
* priority. For example, if you wrap {@link DCAwareRoundRobinPolicy} with this
4445
* token aware policy, replicas from remote data centers may only be
45-
* returned after all the host of the local data center.
46+
* returned after all the hosts of the local data center.
4647
*/
4748
public class TokenAwarePolicy implements ChainableLoadBalancingPolicy {
4849

4950
private final LoadBalancingPolicy childPolicy;
50-
private final boolean shuffleReplicas;
5151
private volatile Metadata clusterMetadata;
5252
private volatile ProtocolVersion protocolVersion;
5353
private volatile CodecRegistry codecRegistry;
@@ -56,28 +56,24 @@ public class TokenAwarePolicy implements ChainableLoadBalancingPolicy {
5656
* Creates a new {@code TokenAware} policy.
5757
*
5858
* @param childPolicy the load balancing policy to wrap with token awareness.
59-
* @param shuffleReplicas whether to shuffle the replicas returned by {@code getRoutingKey}.
60-
* Note that setting this parameter to {@code true} might decrease the
61-
* effectiveness of caching (especially at consistency level ONE), since
62-
* the same row will be retrieved from any replica (instead of only the
63-
* "primary" replica without shuffling).
64-
* On the other hand, shuffling will better distribute writes, and can
65-
* alleviate hotspots caused by "fat" partitions.
59+
* @param shuffleReplicas ignored.
60+
* @deprecated The parameter {@code shuffleReplicas} has been deprecated and has no effect anymore.
61+
* Use {@link #TokenAwarePolicy(LoadBalancingPolicy)} instead. This constructor will be removed
62+
* in the next major release.
6663
*/
67-
public TokenAwarePolicy(LoadBalancingPolicy childPolicy, boolean shuffleReplicas) {
64+
@SuppressWarnings("DeprecatedIsStillUsed")
65+
@Deprecated
66+
public TokenAwarePolicy(LoadBalancingPolicy childPolicy, @SuppressWarnings("unused") boolean shuffleReplicas) {
6867
this.childPolicy = childPolicy;
69-
this.shuffleReplicas = shuffleReplicas;
7068
}
7169

7270
/**
73-
* Creates a new {@code TokenAware} policy with shuffling of replicas.
71+
* Creates a new {@code TokenAware} policy.
7472
*
75-
* @param childPolicy the load balancing policy to wrap with token
76-
* awareness.
77-
* @see #TokenAwarePolicy(LoadBalancingPolicy, boolean)
73+
* @param childPolicy the load balancing policy to wrap with token awareness.
7874
*/
7975
public TokenAwarePolicy(LoadBalancingPolicy childPolicy) {
80-
this(childPolicy, true);
76+
this.childPolicy = childPolicy;
8177
}
8278

8379
@Override
@@ -94,26 +90,23 @@ public void init(Cluster cluster, Collection<Host> hosts) {
9490
}
9591

9692
/**
97-
* Return the HostDistance for the provided host.
98-
*
99-
* @param host the host of which to return the distance of.
100-
* @return the HostDistance to {@code host} as returned by the wrapped policy.
93+
* {@inheritDoc}
94+
* <p/>
95+
* This implementation always returns distances as reported by the wrapped policy.
10196
*/
10297
@Override
10398
public HostDistance distance(Host host) {
10499
return childPolicy.distance(host);
105100
}
106101

107102
/**
108-
* Returns the hosts to use for a new query.
103+
* {@inheritDoc}
109104
* <p/>
110-
* The returned plan will first return replicas (whose {@code HostDistance}
111-
* for the child policy is {@code LOCAL}) for the query if it can determine
112-
* them (i.e. mainly if {@code statement.getRoutingKey()} is not {@code null}).
113-
* Following what it will return the plan of the child policy.
114-
*
115-
* @param statement the query for which to build the plan.
116-
* @return the new query plan.
105+
* The returned plan will first return local replicas for the query (i.e.
106+
* replicas whose {@link HostDistance distance} according to the child policy is {@code LOCAL}),
107+
* if it can determine them (i.e. mainly if the statement's
108+
* {@link Statement#getRoutingKey(ProtocolVersion, CodecRegistry)} routing key}
109+
* is not {@code null}); following what it will return the child policy's original query plan.
117110
*/
118111
@Override
119112
public Iterator<Host> newQueryPlan(final String loggedKeyspace, final Statement statement) {
@@ -126,43 +119,24 @@ public Iterator<Host> newQueryPlan(final String loggedKeyspace, final Statement
126119
if (partitionKey == null || keyspace == null)
127120
return childPolicy.newQueryPlan(keyspace, statement);
128121

129-
final Set<Host> replicas = clusterMetadata.getReplicas(Metadata.quote(keyspace), partitionKey);
122+
Set<Host> replicas = clusterMetadata.getReplicas(Metadata.quote(keyspace), partitionKey);
123+
124+
Iterator<Host> childIterator = childPolicy.newQueryPlan(loggedKeyspace, statement);
125+
130126
if (replicas.isEmpty())
131-
return childPolicy.newQueryPlan(loggedKeyspace, statement);
132-
133-
final Iterator<Host> iter;
134-
if (shuffleReplicas) {
135-
List<Host> l = Lists.newArrayList(replicas);
136-
Collections.shuffle(l);
137-
iter = l.iterator();
138-
} else {
139-
iter = replicas.iterator();
127+
return childIterator;
128+
129+
List<Host> queryPlan = new ArrayList<Host>();
130+
int i = 0;
131+
while (childIterator.hasNext()) {
132+
Host host = childIterator.next();
133+
if (host.isUp() && childPolicy.distance(host) == HostDistance.LOCAL && replicas.contains(host))
134+
queryPlan.add(i++, host);
135+
else
136+
queryPlan.add(host);
140137
}
138+
return queryPlan.iterator();
141139

142-
return new AbstractIterator<Host>() {
143-
144-
private Iterator<Host> childIterator;
145-
146-
@Override
147-
protected Host computeNext() {
148-
while (iter.hasNext()) {
149-
Host host = iter.next();
150-
if (host.isUp() && childPolicy.distance(host) == HostDistance.LOCAL)
151-
return host;
152-
}
153-
154-
if (childIterator == null)
155-
childIterator = childPolicy.newQueryPlan(loggedKeyspace, statement);
156-
157-
while (childIterator.hasNext()) {
158-
Host host = childIterator.next();
159-
// Skip it if it was already a local replica
160-
if (!replicas.contains(host) || childPolicy.distance(host) != HostDistance.LOCAL)
161-
return host;
162-
}
163-
return endOfData();
164-
}
165-
};
166140
}
167141

168142
@Override

0 commit comments

Comments
 (0)