1616package com .datastax .driver .core ;
1717
1818import java .util .HashMap ;
19+ import java .util .HashSet ;
20+ import java .util .Iterator ;
21+ import java .util .LinkedHashSet ;
1922import java .util .List ;
2023import java .util .Map ;
2124import java .util .Set ;
@@ -32,14 +35,19 @@ abstract class ReplicationStrategy {
3235 static ReplicationStrategy create (Map <String , String > replicationOptions ) {
3336
3437 String strategyClass = replicationOptions .get ("class" );
35- String repFactorString = replicationOptions .get ("replication_factor" );
36- if (strategyClass == null | repFactorString == null )
38+ if (strategyClass == null )
3739 return null ;
3840
3941 try {
4042 if (strategyClass .contains ("SimpleStrategy" )) {
41- return new SimpleStrategy (Integer .parseInt (repFactorString ));
42- } else if (strategyClass .contains ("SimpleStrategy" )) {
43+ //replication_factor is only specified for SimpleStrategy
44+ String repFactorString = replicationOptions .get ("replication_factor" );
45+ if (repFactorString == null ) {
46+ return null ;
47+ } else {
48+ return new SimpleStrategy (Integer .parseInt (repFactorString ));
49+ }
50+ } else if (strategyClass .contains ("NetworkTopologyStrategy" )) {
4351 Map <String , Integer > dcRfs = new HashMap <String , Integer >();
4452 for (Map .Entry <String , String > entry : replicationOptions .entrySet ())
4553 {
@@ -79,10 +87,12 @@ Map<Token, Set<Host>> computeTokenToReplicaMap(Map<Token, Host> tokenToPrimary,
7987
8088 Map <Token , Set <Host >> replicaMap = new HashMap <Token , Set <Host >>(tokenToPrimary .size ());
8189 for (int i = 0 ; i < ring .size (); i ++) {
82- ImmutableSet .Builder <Host > builder = ImmutableSet .builder ();
83- for (int j = 0 ; j < rf ; j ++)
84- builder .add (tokenToPrimary .get (getTokenWrapping (i +j , ring )));
85- replicaMap .put (ring .get (i ), builder .build ());
90+ //handle consecutive sections in the ring assigned to the same host
91+ Set <Host > replicas = new LinkedHashSet <Host >();
92+ //we stop when reached the desired RF or ran out of nodes
93+ for (int j = 0 ; j < ring .size () && replicas .size () < rf ; j ++)
94+ replicas .add (tokenToPrimary .get (getTokenWrapping (i +j , ring )));
95+ replicaMap .put (ring .get (i ), ImmutableSet .<Host >builder ().addAll (replicas ).build ());
8696 }
8797 return replicaMap ;
8898 }
@@ -98,36 +108,87 @@ private NetworkTopologyStrategy(Map<String, Integer> replicationFactors) {
98108
99109 Map <Token , Set <Host >> computeTokenToReplicaMap (Map <Token , Host > tokenToPrimary , List <Token > ring ) {
100110
101- Map <Token , Set <Host >> replicaMap = new HashMap <Token , Set <Host >>(tokenToPrimary .size ());
111+ /*
112+ * This is essentially a copy of org.apache.cassandra.locator.NetworkTopologyStrategy
113+ */
114+
115+ final Map <String , Set <String >> racks = getRacksInDcs (tokenToPrimary .values ());
116+ final Map <Token , Set <Host >> replicaMap = new HashMap <Token , Set <Host >>(tokenToPrimary .size ());
102117 for (int i = 0 ; i < ring .size (); i ++) {
103- Map <String , Integer > remainings = new HashMap <String , Integer >(replicationFactors );
104- ImmutableSet .Builder <Host > builder = ImmutableSet .builder ();
105- for (int j = 0 ; j < ring .size (); j ++) {
106- Host h = tokenToPrimary .get (getTokenWrapping (j , ring ));
118+ Map <String , Set <Host >> allDcReplicas = new HashMap <String , Set <Host >>();
119+ Map <String , Set <String >> seenRacks = new HashMap <String , Set <String >>();
120+ Map <String , Set <Host >> skippedDcEndpoints = new HashMap <String , Set <Host >>();
121+ for (String dc : replicationFactors .keySet ()) {
122+ allDcReplicas .put (dc , new HashSet <Host >());
123+ seenRacks .put (dc , new HashSet <String >());
124+ //preserve order
125+ skippedDcEndpoints .put (dc , new LinkedHashSet <Host >());
126+ }
127+
128+ //preserve order - primary replica will be first
129+ Set <Host > replicas = new LinkedHashSet <Host >();
130+ //we stop the inner iteration if all DCs have enough replicas or we reach the end of the ring
131+ for (int j = 0 ; j < ring .size () && !allDone (allDcReplicas ); j ++) {
132+ Host h = tokenToPrimary .get (getTokenWrapping (i + j , ring ));
107133 String dc = h .getDatacenter ();
108134 if (dc == null )
109135 continue ;
110136
111- Integer remaining = remainings .get (dc );
112- if (remaining <= 0 )
137+ Integer rf = replicationFactors .get (dc );
138+ Set <Host > dcReplicas = allDcReplicas .get (dc );
139+ if (dcReplicas .size () >= rf )
113140 continue ;
114141
115- builder .add (h );
116- remainings .put (dc , remaining - 1 );
117- if (allDone (remainings ))
118- break ;
142+ //check if we already visited all racks in dc
143+ if (seenRacks .get (dc ).size () == racks .get (dc ).size ()) {
144+ replicas .add (h );
145+ dcReplicas .add (h );
146+ } else {
147+ String rack = h .getRack ();
148+ //is this a new rack?
149+ if (seenRacks .get (dc ).contains (rack )) {
150+ skippedDcEndpoints .get (dc ).add (h );
151+ } else {
152+ replicas .add (h );
153+ dcReplicas .add (h );
154+ seenRacks .get (dc ).add (rack );
155+ //check if we have run out of all racks
156+ //if yes, add all those nodes that we skipped so far
157+ if (seenRacks .get (dc ).size () == racks .get (dc ).size ()) {
158+ Iterator <Host > skippedIt = skippedDcEndpoints .get (dc ).iterator ();
159+ while (skippedIt .hasNext () && dcReplicas .size () < rf ) {
160+ Host nextSkipped = skippedIt .next ();
161+ replicas .add (nextSkipped );
162+ dcReplicas .add (nextSkipped );
163+ }
164+ }
165+ }
166+ }
119167 }
120- replicaMap .put (ring .get (i ), builder .build ());
168+ replicaMap .put (ring .get (i ), ImmutableSet .< Host > builder (). addAll ( replicas ) .build ());
121169 }
122170 return replicaMap ;
123171 }
124172
125- private boolean allDone (Map <String , Integer > map )
173+ private boolean allDone (Map <String , Set < Host > > map )
126174 {
127- for (Map .Entry <String , Integer > entry : map .entrySet ())
128- if (entry .getValue () > 0 )
175+ for (Map .Entry <String , Set < Host > > entry : map .entrySet ())
176+ if (entry .getValue (). size () < replicationFactors . get ( entry . getKey ()) )
129177 return false ;
130178 return true ;
131179 }
180+
181+ private Map <String , Set <String >> getRacksInDcs (Iterable <Host > hosts ) {
182+ Map <String , Set <String >> result = new HashMap <String , Set <String >>();
183+ for (Host host : hosts ) {
184+ Set <String > racks = result .get (host .getDatacenter ());
185+ if (racks == null ) {
186+ racks = new HashSet <String >();
187+ result .put (host .getDatacenter (), racks );
188+ }
189+ racks .add (host .getRack ());
190+ }
191+ return result ;
192+ }
132193 }
133194}
0 commit comments