1616package com .datastax .driver .core .policies ;
1717
1818import com .datastax .driver .core .*;
19- import com .google .common .collect .AbstractIterator ;
20- import com .google .common .collect .Lists ;
2119
2220import java .nio .ByteBuffer ;
2321import java .util .*;
2422
2523/**
26- * A wrapper load balancing policy that add token awareness to a child policy.
24+ * A wrapper load balancing policy that adds token awareness to a child policy.
2725 * <p/>
2826 * This policy encapsulates another policy. The resulting policy works in
2927 * the following way:
3028 * <ul>
3129 * <li>the {@code distance} method is inherited from the child policy.</li>
32- * <li>the iterator return by the {@code newQueryPlan} method will first
33- * return the {@code LOCAL} replicas for the query (based on {@link Statement#getRoutingKey})
34- * <i>if possible</i> (i.e. if the query {@code getRoutingKey} method
35- * doesn't return {@code null} and if {@link Metadata#getReplicas}
36- * returns a non empty set of replicas for that partition key). If no
37- * local replica can be either found or successfully contacted, the rest
38- * of the query plan will fallback to one of the child policy.</li>
30+ * <li>the iterator returned by the {@code newQueryPlan} method will first
31+ * return the {@link HostDistance#LOCAL LOCAL} replicas for the query
32+ * <em>if possible</em> (i.e. if the query's
33+ * {@link Statement#getRoutingKey(ProtocolVersion, CodecRegistry) routing key}
34+ * is not {@code null} and if the
35+ * {@link Metadata#getReplicas(String, ByteBuffer) set of replicas}
36+ * for that partition key is not empty). If no local replica can be either found
37+ * or successfully contacted, the rest of the query plan will fallback
38+ * to the child policy's one.</li>
3939 * </ul>
4040 * <p/>
41- * Do note that only replica for which the child policy {@code distance}
42- * method returns {@code HostDistance.LOCAL} will be considered having
41+ * Do note that only replicas for which the child policy's
42+ * {@link LoadBalancingPolicy#distance(Host) distance}
43+ * method returns {@link HostDistance#LOCAL LOCAL} will be considered having
4344 * priority. For example, if you wrap {@link DCAwareRoundRobinPolicy} with this
4445 * token aware policy, replicas from remote data centers may only be
45- * returned after all the host of the local data center.
46+ * returned after all the hosts of the local data center.
4647 */
4748public class TokenAwarePolicy implements ChainableLoadBalancingPolicy {
4849
4950 private final LoadBalancingPolicy childPolicy ;
50- private final boolean shuffleReplicas ;
5151 private volatile Metadata clusterMetadata ;
5252 private volatile ProtocolVersion protocolVersion ;
5353 private volatile CodecRegistry codecRegistry ;
@@ -56,28 +56,24 @@ public class TokenAwarePolicy implements ChainableLoadBalancingPolicy {
5656 * Creates a new {@code TokenAware} policy.
5757 *
5858 * @param childPolicy the load balancing policy to wrap with token awareness.
59- * @param shuffleReplicas whether to shuffle the replicas returned by {@code getRoutingKey}.
60- * Note that setting this parameter to {@code true} might decrease the
61- * effectiveness of caching (especially at consistency level ONE), since
62- * the same row will be retrieved from any replica (instead of only the
63- * "primary" replica without shuffling).
64- * On the other hand, shuffling will better distribute writes, and can
65- * alleviate hotspots caused by "fat" partitions.
59+ * @param shuffleReplicas ignored.
60+ * @deprecated The parameter {@code shuffleReplicas} has been deprecated and has no effect anymore.
61+ * Use {@link #TokenAwarePolicy(LoadBalancingPolicy)} instead. This constructor will be removed
62+ * in the next major release.
6663 */
67- public TokenAwarePolicy (LoadBalancingPolicy childPolicy , boolean shuffleReplicas ) {
64+ @ SuppressWarnings ("DeprecatedIsStillUsed" )
65+ @ Deprecated
66+ public TokenAwarePolicy (LoadBalancingPolicy childPolicy , @ SuppressWarnings ("unused" ) boolean shuffleReplicas ) {
6867 this .childPolicy = childPolicy ;
69- this .shuffleReplicas = shuffleReplicas ;
7068 }
7169
7270 /**
73- * Creates a new {@code TokenAware} policy with shuffling of replicas .
71+ * Creates a new {@code TokenAware} policy.
7472 *
75- * @param childPolicy the load balancing policy to wrap with token
76- * awareness.
77- * @see #TokenAwarePolicy(LoadBalancingPolicy, boolean)
73+ * @param childPolicy the load balancing policy to wrap with token awareness.
7874 */
7975 public TokenAwarePolicy (LoadBalancingPolicy childPolicy ) {
80- this ( childPolicy , true ) ;
76+ this . childPolicy = childPolicy ;
8177 }
8278
8379 @ Override
@@ -94,26 +90,23 @@ public void init(Cluster cluster, Collection<Host> hosts) {
9490 }
9591
9692 /**
97- * Return the HostDistance for the provided host.
98- *
99- * @param host the host of which to return the distance of.
100- * @return the HostDistance to {@code host} as returned by the wrapped policy.
93+ * {@inheritDoc}
94+ * <p/>
95+ * This implementation always returns distances as reported by the wrapped policy.
10196 */
10297 @ Override
10398 public HostDistance distance (Host host ) {
10499 return childPolicy .distance (host );
105100 }
106101
107102 /**
108- * Returns the hosts to use for a new query.
103+ * {@inheritDoc}
109104 * <p/>
110- * The returned plan will first return replicas (whose {@code HostDistance}
111- * for the child policy is {@code LOCAL}) for the query if it can determine
112- * them (i.e. mainly if {@code statement.getRoutingKey()} is not {@code null}).
113- * Following what it will return the plan of the child policy.
114- *
115- * @param statement the query for which to build the plan.
116- * @return the new query plan.
105+ * The returned plan will first return local replicas for the query (i.e.
106+ * replicas whose {@link HostDistance distance} according to the child policy is {@code LOCAL}),
107+ * if it can determine them (i.e. mainly if the statement's
108+ * {@link Statement#getRoutingKey(ProtocolVersion, CodecRegistry)} routing key}
109+ * is not {@code null}); following what it will return the child policy's original query plan.
117110 */
118111 @ Override
119112 public Iterator <Host > newQueryPlan (final String loggedKeyspace , final Statement statement ) {
@@ -126,43 +119,24 @@ public Iterator<Host> newQueryPlan(final String loggedKeyspace, final Statement
126119 if (partitionKey == null || keyspace == null )
127120 return childPolicy .newQueryPlan (keyspace , statement );
128121
129- final Set <Host > replicas = clusterMetadata .getReplicas (Metadata .quote (keyspace ), partitionKey );
122+ Set <Host > replicas = clusterMetadata .getReplicas (Metadata .quote (keyspace ), partitionKey );
123+
124+ Iterator <Host > childIterator = childPolicy .newQueryPlan (loggedKeyspace , statement );
125+
130126 if (replicas .isEmpty ())
131- return childPolicy .newQueryPlan (loggedKeyspace , statement );
132-
133- final Iterator <Host > iter ;
134- if (shuffleReplicas ) {
135- List <Host > l = Lists .newArrayList (replicas );
136- Collections .shuffle (l );
137- iter = l .iterator ();
138- } else {
139- iter = replicas .iterator ();
127+ return childIterator ;
128+
129+ List <Host > queryPlan = new ArrayList <Host >();
130+ int i = 0 ;
131+ while (childIterator .hasNext ()) {
132+ Host host = childIterator .next ();
133+ if (host .isUp () && childPolicy .distance (host ) == HostDistance .LOCAL && replicas .contains (host ))
134+ queryPlan .add (i ++, host );
135+ else
136+ queryPlan .add (host );
140137 }
138+ return queryPlan .iterator ();
141139
142- return new AbstractIterator <Host >() {
143-
144- private Iterator <Host > childIterator ;
145-
146- @ Override
147- protected Host computeNext () {
148- while (iter .hasNext ()) {
149- Host host = iter .next ();
150- if (host .isUp () && childPolicy .distance (host ) == HostDistance .LOCAL )
151- return host ;
152- }
153-
154- if (childIterator == null )
155- childIterator = childPolicy .newQueryPlan (loggedKeyspace , statement );
156-
157- while (childIterator .hasNext ()) {
158- Host host = childIterator .next ();
159- // Skip it if it was already a local replica
160- if (!replicas .contains (host ) || childPolicy .distance (host ) != HostDistance .LOCAL )
161- return host ;
162- }
163- return endOfData ();
164- }
165- };
166140 }
167141
168142 @ Override
0 commit comments