Skip to content

Commit d8b0bfa

Browse files
Alexandre Dutraolim7t
authored andcommitted
JAVA-657: Debounce control connection queries.
Includes squashed contributions from olim7t and tolbertam.
1 parent 2424fce commit d8b0bfa

17 files changed

Lines changed: 1170 additions & 133 deletions

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
- [improvement] Provide an option to not re-prepare all statements in onUp (JAVA-658)
3636
- [improvement] Customizable creation of netty timer (JAVA-853)
3737
- [bug] Avoid quadratic ring processing with invalid replication factors (JAVA-859)
38+
- [improvement] Debounce control connection queries (JAVA-657)
3839

3940
Merged from 2.0.10_fixes branch:
4041

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 318 additions & 90 deletions
Large diffs are not rendered by default.

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
import java.net.InetSocketAddress;
2020
import java.net.UnknownHostException;
2121
import java.util.*;
22-
import java.util.concurrent.ExecutionException;
23-
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.*;
2423
import java.util.concurrent.atomic.AtomicReference;
2524

2625
import com.google.common.base.Objects;
@@ -258,11 +257,14 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
258257
// the node we're connecting to.
259258
refreshNodeListAndTokenMap(connection, cluster, isInitialConnection, true);
260259

261-
// Note that refreshing the schema will trigger refreshNodeListAndTokenMap since table == null
262-
// We want that because the token map was not properly initialized by the first call above, since it requires the list of keyspaces
263-
// to be loaded.
264260
logger.debug("[Control connection] Refreshing schema");
265-
refreshSchema(connection, null, null, cluster, isInitialConnection);
261+
refreshSchema(connection, null, null, cluster);
262+
263+
// We need to refresh the node list again;
264+
// We want that because the token map was not properly initialized by the first call above,
265+
// since it requires the list of keyspaces to be loaded.
266+
refreshNodeListAndTokenMap(connection, cluster, false, true);
267+
266268
return connection;
267269
} catch (BusyConnectionException e) {
268270
connection.closeAsync().force();
@@ -283,13 +285,22 @@ private Connection tryConnect(Host host, boolean isInitialConnection) throws Con
283285
}
284286

285287
public void refreshSchema(String keyspace, String table) throws InterruptedException {
286-
logger.debug("[Control connection] Refreshing schema for {}{}", keyspace == null ? "" : keyspace, table == null ? "" : '.' + table);
288+
if(keyspace == null && table == null) {
289+
logger.debug("[Control connection] Refreshing schema");
290+
} else {
291+
logger.debug("[Control connection] Refreshing schema for {}{}", keyspace == null ? "" : keyspace, table == null ? "" : '.' + table);
292+
}
287293
try {
288294
Connection c = connectionRef.get();
289295
// At startup, when we add the initial nodes, this will be null, which is ok
290296
if (c == null)
291297
return;
292-
refreshSchema(c, keyspace, table, cluster, false);
298+
refreshSchema(c, keyspace, table, cluster);
299+
// If the table is null, we either rebuild all from scratch or have an updated keyspace. In both cases, rebuild the token map
300+
// since some replication on some keyspace may have changed
301+
if (table == null) {
302+
cluster.submitNodeListRefresh();
303+
}
293304
} catch (ConnectionException e) {
294305
logger.debug("[Control connection] Connection error while refreshing schema ({})", e.getMessage());
295306
signalError();
@@ -304,7 +315,7 @@ public void refreshSchema(String keyspace, String table) throws InterruptedExcep
304315
}
305316
}
306317

307-
static void refreshSchema(Connection connection, String keyspace, String table, Cluster.Manager cluster, boolean isInitialConnection) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
318+
static void refreshSchema(Connection connection, String keyspace, String table, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
308319
// Make sure we're up to date on schema
309320
String whereClause = "";
310321
if (keyspace != null) {
@@ -345,11 +356,6 @@ static void refreshSchema(Connection connection, String keyspace, String table,
345356
// So log, but let things go otherwise.
346357
logger.error("Error parsing schema from Cassandra system tables: the schema in Cluster#getMetadata() will appear incomplete or stale", e);
347358
}
348-
349-
// If the table is null, we either rebuild all from scratch or have an updated keyspace. In both case, rebuild the token map
350-
// since some replication on some keyspace may have changed
351-
if (table == null)
352-
refreshNodeListAndTokenMap(connection, cluster, false, false);
353359
}
354360

355361
public void refreshNodeListAndTokenMap() {
@@ -358,7 +364,6 @@ public void refreshNodeListAndTokenMap() {
358364
if (c == null)
359365
return;
360366

361-
logger.debug("[Control connection] Refreshing node list and token map");
362367
try {
363368
refreshNodeListAndTokenMap(c, cluster, false, true);
364369
} catch (ConnectionException e) {
@@ -591,14 +596,13 @@ private static void refreshNodeListAndTokenMap(Connection connection, Cluster.Ma
591596
cluster.metadata.rebuildTokenMap(partitioner, tokenMap);
592597
}
593598

594-
static boolean waitForSchemaAgreement(Connection connection, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
595-
599+
boolean waitForSchemaAgreement() throws ConnectionException, BusyConnectionException, ExecutionException, InterruptedException {
596600
long start = System.nanoTime();
597601
long elapsed = 0;
598602
int maxSchemaAgreementWaitSeconds = cluster.configuration.getProtocolOptions().getMaxSchemaAgreementWaitSeconds();
599603
while (elapsed < maxSchemaAgreementWaitSeconds * 1000) {
600604

601-
if (checkSchemaAgreement(connection, cluster))
605+
if (checkSchemaAgreement())
602606
return true;
603607

604608
// let's not flood the node too much
@@ -610,7 +614,11 @@ static boolean waitForSchemaAgreement(Connection connection, Cluster.Manager clu
610614
return false;
611615
}
612616

613-
private static boolean checkSchemaAgreement(Connection connection, Cluster.Manager cluster) throws ConnectionException, BusyConnectionException, InterruptedException, ExecutionException {
617+
boolean checkSchemaAgreement() throws ConnectionException, BusyConnectionException, InterruptedException, ExecutionException {
618+
Connection connection = connectionRef.get();
619+
if (connection == null || connection.isClosed())
620+
return false;
621+
614622
DefaultResultSetFuture peersFuture = new DefaultResultSetFuture(null, new Requests.Query(SELECT_SCHEMA_PEERS));
615623
DefaultResultSetFuture localFuture = new DefaultResultSetFuture(null, new Requests.Query(SELECT_SCHEMA_LOCAL));
616624
connection.write(peersFuture);
@@ -636,16 +644,6 @@ private static boolean checkSchemaAgreement(Connection connection, Cluster.Manag
636644
return versions.size() <= 1;
637645
}
638646

639-
boolean checkSchemaAgreement() {
640-
Connection c = connectionRef.get();
641-
try {
642-
return c != null && checkSchemaAgreement(c, cluster);
643-
} catch (Exception e) {
644-
logger.warn("Error while checking schema agreement", e);
645-
return false;
646-
}
647-
}
648-
649647
boolean isOpen() {
650648
Connection c = connectionRef.get();
651649
return c != null && !c.isClosed();
@@ -679,7 +677,7 @@ public void onAdd(Host host) {
679677
// or it's not part of our computed token map
680678
Metadata.TokenMap tkmap = cluster.metadata.tokenMap;
681679
if (host.getCassandraVersion() == null || tkmap == null || !tkmap.hosts.contains(host))
682-
refreshNodeListAndTokenMap();
680+
cluster.submitNodeListRefresh();
683681
}
684682

685683
@Override
@@ -693,6 +691,6 @@ public void onRemove(Host host) {
693691
backgroundReconnect(0);
694692
}
695693

696-
refreshNodeListAndTokenMap();
694+
cluster.submitNodeListRefresh();
697695
}
698696
}

0 commit comments

Comments
 (0)