Skip to content

Commit 4abf893

Browse files
Alexandre Dutraolim7t
authored andcommitted
JAVA-1193: Refresh token and replica metadata synchronously when schema is altered
This commit also introduces the following refactorings: 1) Cache and reuse host and token information whenever possible This commit modifies the TokenMap class so that it caches all the information about hosts and tokens from the last token map computation. This information can then be reused in a subsequent token map computation, if only the schema has changed, but not the topology. 2) Optimize calls to submitNodeListRefresh() In some circonstances, this method could be called recursively (i.e., triggered while already processing a node list refresh request). This commit suppresses the calls to submitNodeListRefresh() from ControlConnection and replaces them with calls to that method only when appropriate, i.e.: - When inside a node refresh, and the node has been added or removed; - When reconnecting, and the node has been added. The effect is that submitNodeListRefresh() is NOT called when in triggerOnAdd() nor in triggerOnRemove(), because we are already inside a node list refresh operation. 3) Minor optimizations This commit optimizes code around TokenMap: - Remove unnecessary field TokenMap.hosts - Rename fields in TokenMap for consistency - Avoid creating intermediary Set<String> instances by creating Set<Token> instances directly when reading rows from system.local and system.peers.
1 parent 74d0201 commit 4abf893

8 files changed

Lines changed: 178 additions & 183 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
- [improvement] JAVA-1227: Document "SELECT *" issue with prepared statement.
1919
- [bug] JAVA-1160: Fix NPE in VersionNumber.getPreReleaseLabels().
2020
- [improvement] JAVA-1126: Handle schema changes in Mapper.
21+
- [bug] JAVA-1193: Refresh token and replica metadata synchronously when schema is altered.
2122

2223

2324
### 3.0.2

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,9 +1851,10 @@ protected void onReconnection(Connection connection) {
18511851
if (controlConnection.refreshNodeInfo(host)) {
18521852
logger.debug("Successful reconnection to {}, setting host UP", host);
18531853
try {
1854-
if (isHostAddition)
1854+
if (isHostAddition) {
18551855
onAdd(host, connection);
1856-
else
1856+
submitNodeListRefresh();
1857+
} else
18571858
onUp(host, connection);
18581859
} catch (InterruptedException e) {
18591860
Thread.currentThread().interrupt();
@@ -2226,15 +2227,15 @@ ListenableFuture<Void> submitSchemaRefresh(final SchemaElement targetType, final
22262227
return schemaRefreshRequestDebouncer.eventReceived(request);
22272228
}
22282229

2229-
void submitNodeListRefresh() {
2230+
ListenableFuture<Void> submitNodeListRefresh() {
22302231
logger.trace("Submitting node list and token map refresh");
2231-
nodeListRefreshRequestDebouncer.eventReceived(new NodeListRefreshRequest());
2232+
return nodeListRefreshRequestDebouncer.eventReceived(new NodeListRefreshRequest());
22322233
}
22332234

2234-
void submitNodeRefresh(InetSocketAddress address, HostEvent eventType) {
2235+
ListenableFuture<Void> submitNodeRefresh(InetSocketAddress address, HostEvent eventType) {
22352236
NodeRefreshRequest request = new NodeRefreshRequest(address, eventType);
22362237
logger.trace("Submitting node refresh: {}", request);
2237-
nodeRefreshRequestDebouncer.eventReceived(request);
2238+
return nodeRefreshRequestDebouncer.eventReceived(request);
22382239
}
22392240

22402241
// refresh the schema using the provided connection, and notice the future with the provided resultset once done
@@ -2709,6 +2710,7 @@ private ExceptionCatchingRunnable hostAdded(final Host host) {
27092710
public void runMayThrow() throws Exception {
27102711
if (controlConnection.refreshNodeInfo(host)) {
27112712
onAdd(host, null);
2713+
submitNodeListRefresh();
27122714
} else {
27132715
logger.debug("Not enough info for {}, ignoring host", host);
27142716
}
@@ -2745,6 +2747,7 @@ public void runMayThrow() throws Exception {
27452747
if (metadata.remove(host)) {
27462748
logger.info("Cassandra host {} removed", host);
27472749
onRemove(host);
2750+
submitNodeListRefresh();
27482751
}
27492752
}
27502753
};

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -267,16 +267,15 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
267267

268268
// We need to refresh the node list first so we know about the cassandra version of
269269
// the node we're connecting to.
270+
// This will create the token map for the first time, but it will be incomplete
271+
// due to the lack of keyspace information
270272
refreshNodeListAndTokenMap(connection, cluster, isInitialConnection, true);
271273

274+
// refresh schema will also update the token map again,
275+
// this time with information about keyspaces
272276
logger.debug("[Control connection] Refreshing schema");
273277
refreshSchema(connection, null, null, null, null, cluster);
274278

275-
// We need to refresh the node list again;
276-
// We want that because the token map was not properly initialized by the first call above,
277-
// since it requires the list of keyspaces to be loaded.
278-
refreshNodeListAndTokenMap(connection, cluster, false, false);
279-
280279
return connection;
281280
} catch (BusyConnectionException e) {
282281
connection.closeAsync().force();
@@ -306,11 +305,6 @@ public void refreshSchema(SchemaElement targetType, String targetKeyspace, Strin
306305
if (c == null || c.isClosed())
307306
return;
308307
refreshSchema(c, targetType, targetKeyspace, targetName, signature, cluster);
309-
// If we rebuild all from scratch or have an updated keyspace, rebuild the token map
310-
// since some replication on some keyspace may have changed
311-
if ((targetType == null || targetType == KEYSPACE)) {
312-
cluster.submitNodeListRefresh();
313-
}
314308
} catch (ConnectionException e) {
315309
logger.debug("[Control connection] Connection error while refreshing schema ({})", e.getMessage());
316310
signalError();
@@ -545,7 +539,8 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
545539
connection.write(peersFuture);
546540

547541
String partitioner = null;
548-
Map<Host, Collection<String>> tokenMap = new HashMap<Host, Collection<String>>();
542+
Token.Factory factory = null;
543+
Map<Host, Set<Token>> tokenMap = new HashMap<Host, Set<Token>>();
549544

550545
// Update cluster name, DC and rack for the one node we are connected to
551546
Row localRow = localFuture.get().one();
@@ -555,8 +550,10 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
555550
cluster.metadata.clusterName = clusterName;
556551

557552
partitioner = localRow.getString("partitioner");
558-
if (partitioner != null)
553+
if (partitioner != null) {
559554
cluster.metadata.partitioner = partitioner;
555+
factory = Token.getFactory(partitioner);
556+
}
560557

561558
Host host = cluster.metadata.getHost(connection.address);
562559
// In theory host can't be null. However there is no point in risking a NPE in case we
@@ -565,10 +562,12 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
565562
logger.debug("Host in local system table ({}) unknown to us (ok if said host just got removed)", connection.address);
566563
} else {
567564
updateInfo(host, localRow, cluster, isInitialConnection);
568-
if (metadataEnabled) {
569-
Set<String> tokens = localRow.getSet("tokens", String.class);
570-
if (partitioner != null && !tokens.isEmpty())
565+
if (metadataEnabled && factory != null) {
566+
Set<String> tokensStr = localRow.getSet("tokens", String.class);
567+
if (!tokensStr.isEmpty()) {
568+
Set<Token> tokens = toTokens(factory, tokensStr);
571569
tokenMap.put(host, tokens);
570+
}
572571
}
573572
}
574573
}
@@ -579,7 +578,7 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
579578
List<String> cassandraVersions = new ArrayList<String>();
580579
List<InetAddress> broadcastAddresses = new ArrayList<InetAddress>();
581580
List<InetAddress> listenAddresses = new ArrayList<InetAddress>();
582-
List<Set<String>> allTokens = new ArrayList<Set<String>>();
581+
List<Set<Token>> allTokens = new ArrayList<Set<Token>>();
583582
List<String> dseVersions = new ArrayList<String>();
584583
List<Boolean> dseGraphEnabled = new ArrayList<Boolean>();
585584
List<String> dseWorkloads = new ArrayList<String>();
@@ -596,8 +595,14 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
596595
racks.add(row.getString("rack"));
597596
cassandraVersions.add(row.getString("release_version"));
598597
broadcastAddresses.add(row.getInet("peer"));
599-
if (metadataEnabled)
600-
allTokens.add(row.getSet("tokens", String.class));
598+
if (metadataEnabled && factory != null) {
599+
Set<String> tokensStr = row.getSet("tokens", String.class);
600+
Set<Token> tokens = null;
601+
if (!tokensStr.isEmpty()) {
602+
tokens = toTokens(factory, tokensStr);
603+
}
604+
allTokens.add(tokens);
605+
}
601606
InetAddress listenAddress = row.getColumnDefinitions().contains("listen_address") ? row.getInet("listen_address") : null;
602607
listenAddresses.add(listenAddress);
603608
String dseWorkload = row.getColumnDefinitions().contains("workload") ? row.getString("workload") : null;
@@ -640,7 +645,7 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
640645
if (dseGraphEnabled.get(i) != null)
641646
host.setDseGraphEnabled(dseGraphEnabled.get(i));
642647

643-
if (metadataEnabled && partitioner != null && !allTokens.get(i).isEmpty())
648+
if (metadataEnabled && factory != null && allTokens.get(i) != null)
644649
tokenMap.put(host, allTokens.get(i));
645650

646651
if (isNew && !isInitialConnection)
@@ -653,8 +658,16 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
653658
if (!host.getSocketAddress().equals(connection.address) && !foundHostsSet.contains(host.getSocketAddress()))
654659
cluster.removeHost(host, isInitialConnection);
655660

656-
if (metadataEnabled)
657-
cluster.metadata.rebuildTokenMap(partitioner, tokenMap);
661+
if (metadataEnabled && factory != null && !tokenMap.isEmpty())
662+
cluster.metadata.rebuildTokenMap(factory, tokenMap);
663+
}
664+
665+
private static Set<Token> toTokens(Token.Factory factory, Set<String> tokensStr) {
666+
Set<Token> tokens = new LinkedHashSet<Token>(tokensStr.size());
667+
for (String tokenStr : tokensStr) {
668+
tokens.add(factory.fromString(tokenStr));
669+
}
670+
return tokens;
658671
}
659672

660673
private static boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
@@ -756,13 +769,15 @@ boolean isOpen() {
756769
public void onUp(Host host) {
757770
}
758771

772+
public void onAdd(Host host) {
773+
}
774+
759775
public void onDown(Host host) {
760776
onHostGone(host);
761777
}
762778

763779
public void onRemove(Host host) {
764780
onHostGone(host);
765-
cluster.submitNodeListRefresh();
766781
}
767782

768783
private void onHostGone(Host host) {
@@ -783,11 +798,4 @@ public void onConnectionDefunct(Connection connection) {
783798
backgroundReconnect(0);
784799
}
785800

786-
public void onAdd(Host host) {
787-
// Refresh infos and token map if we didn't knew about that host, i.e. if we either don't have basic infos on it,
788-
// or it's not part of our computed token map
789-
Metadata.TokenMap tkmap = cluster.metadata.tokenMap;
790-
if (host.getCassandraVersion() == null || tkmap == null || !tkmap.hosts.contains(host))
791-
cluster.submitNodeListRefresh();
792-
}
793801
}

driver-core/src/main/java/com/datastax/driver/core/Metadata.java

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class Metadata {
4141
volatile String partitioner;
4242
private final ConcurrentMap<InetSocketAddress, Host> hosts = new ConcurrentHashMap<InetSocketAddress, Host>();
4343
final ConcurrentMap<String, KeyspaceMetadata> keyspaces = new ConcurrentHashMap<String, KeyspaceMetadata>();
44-
volatile TokenMap tokenMap;
44+
private volatile TokenMap tokenMap;
4545

4646
final ReentrantLock lock = new ReentrantLock();
4747

@@ -62,18 +62,28 @@ public class Metadata {
6262
this.cluster = cluster;
6363
}
6464

65-
void rebuildTokenMap(String partitioner, Map<Host, Collection<String>> allTokens) {
65+
// rebuilds the token map with the current hosts, typically when refreshing schema metadata
66+
void rebuildTokenMap() {
6667
lock.lock();
6768
try {
68-
if (allTokens.isEmpty())
69-
return;
70-
71-
Token.Factory factory = partitioner == null
72-
? (tokenMap == null ? null : tokenMap.factory)
73-
: Token.getFactory(partitioner);
74-
if (factory == null)
69+
if (tokenMap == null)
7570
return;
71+
this.tokenMap = TokenMap.build(
72+
tokenMap.factory,
73+
tokenMap.primaryToTokens,
74+
keyspaces.values(),
75+
tokenMap.ring,
76+
tokenMap.tokenRanges,
77+
tokenMap.tokenToPrimary);
78+
} finally {
79+
lock.unlock();
80+
}
81+
}
7682

83+
// rebuilds the token map for a new set of hosts, typically when refreshing nodes list
84+
void rebuildTokenMap(Token.Factory factory, Map<Host, Set<Token>> allTokens) {
85+
lock.lock();
86+
try {
7787
this.tokenMap = TokenMap.build(factory, allTokens, keyspaces.values());
7888
} finally {
7989
lock.unlock();
@@ -264,7 +274,7 @@ public Set<TokenRange> getTokenRanges(String keyspace, Host host) {
264274
if (current == null) {
265275
return Collections.emptySet();
266276
} else {
267-
Map<Host, Set<TokenRange>> dcRanges = current.hostsToRanges.get(keyspace);
277+
Map<Host, Set<TokenRange>> dcRanges = current.hostsToRangesByKeyspace.get(keyspace);
268278
if (dcRanges == null) {
269279
return Collections.emptySet();
270280
} else {
@@ -384,7 +394,7 @@ public KeyspaceMetadata getKeyspace(String keyspace) {
384394
KeyspaceMetadata removeKeyspace(String keyspace) {
385395
KeyspaceMetadata removed = keyspaces.remove(keyspace);
386396
if (tokenMap != null)
387-
tokenMap.tokenToHosts.remove(keyspace);
397+
tokenMap.tokenToHostsByKeyspace.remove(keyspace);
388398
return removed;
389399
}
390400

@@ -609,61 +619,57 @@ void triggerOnMaterializedViewRemoved(MaterializedViewMetadata view) {
609619
}
610620
}
611621

612-
static class TokenMap {
622+
private static class TokenMap {
613623

614624
private final Token.Factory factory;
615-
private final Map<String, Map<Token, Set<Host>>> tokenToHosts;
616-
private final Map<String, Map<Host, Set<TokenRange>>> hostsToRanges;
625+
private final Map<Host, Set<Token>> primaryToTokens;
626+
private final Map<String, Map<Token, Set<Host>>> tokenToHostsByKeyspace;
627+
private final Map<String, Map<Host, Set<TokenRange>>> hostsToRangesByKeyspace;
617628
private final List<Token> ring;
618629
private final Set<TokenRange> tokenRanges;
619-
final Set<Host> hosts;
630+
private final Map<Token, Host> tokenToPrimary;
620631

621632
private TokenMap(Token.Factory factory,
633+
List<Token> ring,
634+
Set<TokenRange> tokenRanges,
635+
Map<Token, Host> tokenToPrimary,
622636
Map<Host, Set<Token>> primaryToTokens,
623-
Map<String, Map<Token, Set<Host>>> tokenToHosts,
624-
Map<String, Map<Host, Set<TokenRange>>> hostsToRanges,
625-
List<Token> ring, Set<TokenRange> tokenRanges, Set<Host> hosts) {
637+
Map<String, Map<Token, Set<Host>>> tokenToHostsByKeyspace,
638+
Map<String, Map<Host, Set<TokenRange>>> hostsToRangesByKeyspace) {
626639
this.factory = factory;
627-
this.tokenToHosts = tokenToHosts;
628-
this.hostsToRanges = hostsToRanges;
629640
this.ring = ring;
630641
this.tokenRanges = tokenRanges;
631-
this.hosts = hosts;
642+
this.tokenToPrimary = tokenToPrimary;
643+
this.primaryToTokens = primaryToTokens;
644+
this.tokenToHostsByKeyspace = tokenToHostsByKeyspace;
645+
this.hostsToRangesByKeyspace = hostsToRangesByKeyspace;
632646
for (Map.Entry<Host, Set<Token>> entry : primaryToTokens.entrySet()) {
633647
Host host = entry.getKey();
634648
host.setTokens(ImmutableSet.copyOf(entry.getValue()));
635649
}
636650
}
637651

638-
public static TokenMap build(Token.Factory factory, Map<Host, Collection<String>> allTokens, Collection<KeyspaceMetadata> keyspaces) {
639-
640-
Set<Host> hosts = allTokens.keySet();
652+
private static TokenMap build(Token.Factory factory, Map<Host, Set<Token>> allTokens, Collection<KeyspaceMetadata> keyspaces) {
641653
Map<Token, Host> tokenToPrimary = new HashMap<Token, Host>();
642-
Map<Host, Set<Token>> primaryToTokens = new HashMap<Host, Set<Token>>();
643654
Set<Token> allSorted = new TreeSet<Token>();
644-
645-
for (Map.Entry<Host, Collection<String>> entry : allTokens.entrySet()) {
655+
for (Map.Entry<Host, ? extends Collection<Token>> entry : allTokens.entrySet()) {
646656
Host host = entry.getKey();
647-
for (String tokenStr : entry.getValue()) {
657+
for (Token t : entry.getValue()) {
648658
try {
649-
Token t = factory.fromString(tokenStr);
650659
allSorted.add(t);
651660
tokenToPrimary.put(t, host);
652-
Set<Token> hostTokens = primaryToTokens.get(host);
653-
if (hostTokens == null) {
654-
hostTokens = new HashSet<Token>();
655-
primaryToTokens.put(host, hostTokens);
656-
}
657-
hostTokens.add(t);
658661
} catch (IllegalArgumentException e) {
659662
// If we failed parsing that token, skip it
660663
}
661664
}
662665
}
663-
664666
List<Token> ring = new ArrayList<Token>(allSorted);
665667
Set<TokenRange> tokenRanges = makeTokenRanges(ring, factory);
668+
return build(factory, allTokens, keyspaces, ring, tokenRanges, tokenToPrimary);
669+
}
666670

671+
private static TokenMap build(Token.Factory factory, Map<Host, Set<Token>> allTokens, Collection<KeyspaceMetadata> keyspaces, List<Token> ring, Set<TokenRange> tokenRanges, Map<Token, Host> tokenToPrimary) {
672+
Set<Host> hosts = allTokens.keySet();
667673
Map<String, Map<Token, Set<Host>>> tokenToHosts = new HashMap<String, Map<Token, Set<Host>>>();
668674
Map<ReplicationStrategy, Map<Token, Set<Host>>> replStrategyToHosts = new HashMap<ReplicationStrategy, Map<Token, Set<Host>>>();
669675
Map<String, Map<Host, Set<TokenRange>>> hostsToRanges = new HashMap<String, Map<Host, Set<TokenRange>>>();
@@ -691,17 +697,17 @@ public static TokenMap build(Token.Factory factory, Map<Host, Collection<String>
691697
}
692698
hostsToRanges.put(keyspace.getName(), ksRanges);
693699
}
694-
return new TokenMap(factory, primaryToTokens, tokenToHosts, hostsToRanges, ring, tokenRanges, hosts);
700+
return new TokenMap(factory, ring, tokenRanges, tokenToPrimary, allTokens, tokenToHosts, hostsToRanges);
695701
}
696702

697703
private Set<Host> getReplicas(String keyspace, Token token) {
698704

699-
Map<Token, Set<Host>> keyspaceHosts = tokenToHosts.get(keyspace);
700-
if (keyspaceHosts == null)
705+
Map<Token, Set<Host>> tokenToHosts = tokenToHostsByKeyspace.get(keyspace);
706+
if (tokenToHosts == null)
701707
return Collections.emptySet();
702708

703709
// If the token happens to be one of the "primary" tokens, get result directly
704-
Set<Host> hosts = keyspaceHosts.get(token);
710+
Set<Host> hosts = tokenToHosts.get(token);
705711
if (hosts != null)
706712
return hosts;
707713

@@ -713,7 +719,7 @@ private Set<Host> getReplicas(String keyspace, Token token) {
713719
i = 0;
714720
}
715721

716-
return keyspaceHosts.get(ring.get(i));
722+
return tokenToHosts.get(ring.get(i));
717723
}
718724

719725
private static Map<Token, Set<Host>> makeNonReplicatedMap(Map<Token, Host> input) {

0 commit comments

Comments
 (0)