Skip to content

Commit 985168d

Browse files
committed
Handle contact points removed during init (JAVA-792).
1 parent 27ae7b8 commit 985168d

7 files changed

Lines changed: 140 additions & 86 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
- [improvement] implement UPDATE ... IF EXISTS in QueryBuilder (JAVA-827)
2828
- [improvement] Randomize contact points list to prevent hotspots (JAVA-618)
2929
- [improvement] Surface the coordinator used on query failure (JAVA-720)
30+
- [bug] Handle contact points removed during init (JAVA-792)
3031

3132
Merged from 2.0.10_fixes branch:
3233

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.google.common.base.Predicates;
3131
import com.google.common.collect.*;
3232
import com.google.common.util.concurrent.*;
33-
3433
import org.slf4j.Logger;
3534
import org.slf4j.LoggerFactory;
3635

@@ -1259,33 +1258,46 @@ synchronized void init() {
12591258
try {
12601259
while (true) {
12611260
try {
1261+
Collection<Host> allHosts = metadata.allHosts();
1262+
12621263
// At this stage, metadata.allHosts() only contains the contact points, that's what we want to pass to LBP.init().
12631264
// But the control connection will initialize first and discover more hosts, so make a copy.
1264-
Set<Host> contactPointHosts = Sets.newHashSet(metadata.allHosts());
1265+
Set<Host> contactPointHosts = Sets.newHashSet(allHosts);
12651266

12661267
controlConnection.connect();
12671268
if (connectionFactory.protocolVersion < 0)
12681269
connectionFactory.protocolVersion = 2;
12691270

1270-
// The control connection can mark hosts down if it failed to connect to them, separate them
1271+
// The control connection can mark hosts down if it failed to connect to them, or remove them if they weren't found
1272+
// in the control host's system.peers. Separate them:
12711273
Set<Host> downContactPointHosts = Sets.newHashSet();
1272-
for (Host host : contactPointHosts)
1273-
if (host.state == Host.State.DOWN)
1274+
Set<Host> removedContactPointHosts = Sets.newHashSet();
1275+
for (Host host : contactPointHosts) {
1276+
if (!allHosts.contains(host))
1277+
removedContactPointHosts.add(host);
1278+
else if (host.state == Host.State.DOWN)
12741279
downContactPointHosts.add(host);
1280+
}
1281+
contactPointHosts.removeAll(removedContactPointHosts);
12751282
contactPointHosts.removeAll(downContactPointHosts);
12761283

12771284
// Now that the control connection is ready, we have all the information we need about the nodes (datacenter,
12781285
// rack...) to initialize the load balancing policy
12791286
loadBalancingPolicy().init(Cluster.this, contactPointHosts);
12801287
speculativeRetryPolicy().init(Cluster.this);
12811288

1289+
for (Host host : removedContactPointHosts) {
1290+
loadBalancingPolicy().onRemove(host);
1291+
for (Host.StateListener listener : listeners)
1292+
listener.onRemove(host);
1293+
}
12821294
for (Host host : downContactPointHosts) {
12831295
loadBalancingPolicy().onDown(host);
12841296
for (Host.StateListener listener : listeners)
12851297
listener.onDown(host);
12861298
}
12871299

1288-
for (Host host : metadata.allHosts()) {
1300+
for (Host host : allHosts) {
12891301
// If the host is down at this stage, it's a contact point that the control connection failed to reach.
12901302
// Reconnection attempts are already scheduled, and the LBP and listeners have been notified above.
12911303
if (host.state == Host.State.DOWN) continue;
@@ -1898,7 +1910,7 @@ public void removeHost(Host host, boolean isInitialConnection) {
18981910

18991911
if (metadata.remove(host)) {
19001912
if (isInitialConnection) {
1901-
logger.warn("You listed {} in your contact points, but it could not be reached at startup", host);
1913+
logger.warn("You listed {} in your contact points, but it wasn't found in the control host's system.peers at startup", host);
19021914
} else {
19031915
logger.info("Cassandra host {} removed", host);
19041916
triggerOnRemove(host);

driver-core/src/test/java/com/datastax/driver/core/ClusterAssert.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.driver.core;
1717

18+
import java.net.InetSocketAddress;
1819
import java.util.Iterator;
1920
import java.util.Set;
2021
import java.util.TreeSet;
@@ -44,9 +45,17 @@ public ClusterAssert hasClosedControlConnection() {
4445
public HostAssert host(int hostNumber) {
4546
// TODO at some point this won't work anymore if we have assertions that wait for a node to
4647
// join the cluster, e.g. assertThat(cluster).node(3).comesUp().
47-
Host host = TestUtils.findHost(actual, hostNumber);
48+
return new HostAssert(
49+
TestUtils.findHost(actual, hostNumber),
50+
actual);
51+
}
4852

49-
return new HostAssert(host, actual);
53+
public HostAssert host(String hostAddress) {
54+
// TODO at some point this won't work anymore if we have assertions that wait for a node to
55+
// join the cluster, e.g. assertThat(cluster).node(3).comesUp().
56+
return new HostAssert(
57+
TestUtils.findHost(actual, hostAddress),
58+
actual);
5059
}
5160

5261
/**

driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java

Lines changed: 98 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,36 @@
1717

1818
import java.net.InetAddress;
1919
import java.net.InetSocketAddress;
20+
import java.net.UnknownHostException;
2021
import java.util.Collections;
2122
import java.util.List;
23+
import java.util.Map;
2224
import java.util.concurrent.TimeUnit;
2325
import java.util.concurrent.TimeoutException;
2426

27+
import com.google.common.collect.ImmutableMap;
28+
import com.google.common.collect.ImmutableSet;
2529
import com.google.common.collect.Lists;
30+
import org.scassandra.Scassandra;
31+
import org.scassandra.http.client.PrimingClient;
32+
import org.scassandra.http.client.PrimingRequest;
2633
import org.slf4j.Logger;
2734
import org.slf4j.LoggerFactory;
2835
import org.testng.annotations.Test;
2936

30-
import static org.assertj.core.api.Assertions.fail;
37+
import static org.mockito.Mockito.atLeast;
38+
import static org.mockito.Mockito.atMost;
3139
import static org.mockito.Mockito.spy;
32-
import static org.mockito.Mockito.times;
3340
import static org.mockito.Mockito.verify;
3441

3542
import com.datastax.driver.core.exceptions.NoHostAvailableException;
3643
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
37-
import com.datastax.driver.core.querybuilder.Insert;
38-
import com.datastax.driver.core.utils.UUIDs;
3944

45+
import static com.datastax.driver.core.Assertions.*;
4046
import static com.datastax.driver.core.FakeHost.Behavior.THROWING_CONNECT_TIMEOUTS;
41-
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
47+
import static com.datastax.driver.core.Host.State.DOWN;
48+
import static com.datastax.driver.core.Host.State.UP;
49+
import static com.datastax.driver.core.HostDistance.LOCAL;
4250

4351
public class ClusterInitTest {
4452
private static final Logger logger = LoggerFactory.getLogger(ClusterInitTest.class);
@@ -47,44 +55,58 @@ public class ClusterInitTest {
4755
* Test for JAVA-522: when the cluster and session initialize, if some contact points are behaving badly and
4856
* causing timeouts, we want to ensure that the driver does not wait multiple times on the same host.
4957
*/
50-
5158
@Test(groups = "short")
52-
public void should_wait_for_each_contact_point_at_most_once() {
53-
CCMBridge ccm = null;
59+
public void should_handle_failing_or_missing_contact_points() throws UnknownHostException {
5460
Cluster cluster = null;
55-
List<FakeHost> fakeHosts = Lists.newArrayList();
61+
Scassandra scassandra = null;
62+
List<FakeHost> failingHosts = Lists.newArrayList();
5663
try {
57-
// Obtaining connect timeouts is not trivial: we create a 6-host cluster but only start one of them,
58-
// then simulate the other 5.
59-
ccm = CCMBridge.builder("test").withNodes(6).notStarted().build();
60-
ccm.start(1);
61-
62-
for (int i = 0; i < 5; i++) {
63-
FakeHost fakeHost = new FakeHost(CCMBridge.ipOfNode(i + 2), 9042, THROWING_CONNECT_TIMEOUTS);
64-
fakeHosts.add(fakeHost);
65-
fakeHost.start();
64+
// Simulate a cluster of 5 hosts.
65+
66+
// - 1 is an actual Scassandra instance that will accept connections:
67+
scassandra = TestUtils.createScassandraServer();
68+
scassandra.start();
69+
String realHostAddress = "localhost";
70+
int port = scassandra.getBinaryPort();
71+
72+
// - the remaining 4 are fake servers that will throw connect timeouts:
73+
for (int i = 2; i <= 5; i++) {
74+
FakeHost failingHost = new FakeHost(CCMBridge.ipOfNode(i), port, THROWING_CONNECT_TIMEOUTS);
75+
failingHosts.add(failingHost);
76+
failingHost.start();
6677
}
6778

68-
// Our real instance has no rows in its system.peers table. That would cause the driver to ignore our fake
69-
// hosts when the control connection refreshes the host list.
70-
// So we also insert fake rows in system.peers:
71-
fakePeerRowsInNode1();
79+
// - we also have a "missing" contact point, i.e. there's no server listening at this address,
80+
// and the address is not listed in the live host's system.peers
81+
String missingHostAddress = CCMBridge.ipOfNode(6);
82+
83+
primePeerRows(scassandra, failingHosts);
7284

7385
logger.info("Environment is set up, starting test");
7486
long start = System.nanoTime();
7587

76-
// We want to count how many connections were attempted. For that, we rely on the fact that SocketOptions.getKeepAlive is called in Connection.Factory.newBoostrap()
77-
// each time we prepare to open a new connection. This is a bit of a hack, but this is what we have.
88+
// We want to count how many connections were attempted. For that, we rely on the fact that SocketOptions.getKeepAlive
89+
// is called in Connection.Factory.newBoostrap() each time we prepare to open a new connection.
7890
SocketOptions socketOptions = spy(new SocketOptions());
7991

80-
// Set an enormous delay so that reconnection attempts don't pollute our observations
92+
// Set an "infinite" reconnection delay so that reconnection attempts don't pollute our observations
8193
ConstantReconnectionPolicy reconnectionPolicy = new ConstantReconnectionPolicy(3600 * 1000);
8294

83-
cluster = Cluster.builder().addContactPoints(
84-
CCMBridge.ipOfNode(1), CCMBridge.ipOfNode(2), CCMBridge.ipOfNode(3),
85-
CCMBridge.ipOfNode(4), CCMBridge.ipOfNode(5), CCMBridge.ipOfNode(6))
95+
// Force 1 connection per pool. Otherwise we can't distinguish a failed pool creation from multiple connection
96+
// attempts, because pools create their connections in parallel (so 1 pool failure equals multiple connection failures).
97+
PoolingOptions poolingOptions = new PoolingOptions().setConnectionsPerHost(LOCAL, 1, 1);
98+
99+
cluster = Cluster.builder()
100+
.withPort(scassandra.getBinaryPort())
101+
.addContactPoints(
102+
realHostAddress,
103+
failingHosts.get(0).address, failingHosts.get(1).address,
104+
failingHosts.get(2).address, failingHosts.get(3).address,
105+
missingHostAddress
106+
)
86107
.withSocketOptions(socketOptions)
87108
.withReconnectionPolicy(reconnectionPolicy)
109+
.withPoolingOptions(poolingOptions)
88110
.withProtocolVersion(TestUtils.getDesiredProtocolVersion())
89111
.build();
90112
cluster.connect();
@@ -93,72 +115,80 @@ public void should_wait_for_each_contact_point_at_most_once() {
93115
long initTimeMs = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
94116
logger.info("Cluster and session initialized in {} ms", initTimeMs);
95117

96-
// We have one live host so we expect 1 control connection + core connection count successful connections.
97-
// The other 5 hosts are unreachable, we should attempt to connect to each of them only once.
98-
int coreConnections = cluster.getConfiguration()
99-
.getPoolingOptions()
100-
.getCoreConnectionsPerHost(HostDistance.LOCAL);
101-
verify(socketOptions, times(1 + coreConnections + 5)).getKeepAlive();
118+
// Expect :
119+
// - 2 connections for the live host (1 control connection + 1 pooled connection)
120+
// - 1 attempt per failing host (either a control connection attempt or a failed pool creation)
121+
// - 0 or 1 for the missing host. We can't know for sure because contact points are randomized. If it's tried
122+
// before the live host there will be a connection attempt, otherwise it will be removed directly because
123+
// it's not in the live host's system.peers.
124+
verify(socketOptions, atLeast(6)).getKeepAlive();
125+
verify(socketOptions, atMost(7)).getKeepAlive();
126+
127+
assertThat(cluster).host(realHostAddress).hasState(UP);
128+
for (FakeHost failingHost : failingHosts)
129+
assertThat(cluster).host(failingHost.address).hasState(DOWN);
130+
assertThat(cluster).host(missingHostAddress).isNull();
131+
102132
} finally {
103133
if (cluster != null)
104134
cluster.close();
105-
for (FakeHost fakeHost : fakeHosts)
135+
for (FakeHost fakeHost : failingHosts)
106136
fakeHost.stop();
107-
if (ccm != null)
108-
ccm.remove();
137+
if (scassandra != null)
138+
scassandra.stop();
109139
}
110140
}
111141

112142
/**
113-
* <p>
114143
* Validates that a Cluster that was never able to successfully establish connection a session can be closed
115144
* properly.
116145
*
117146
* @test_category connection
118147
* @expected_result Cluster closes within 1 second.
119148
*/
120-
@Test(groups="unit")
149+
@Test(groups = "unit")
121150
public void should_be_able_to_close_cluster_that_never_successfully_connected() throws Exception {
122151
Cluster cluster = Cluster.builder()
123-
.addContactPointsWithPorts(Collections.singleton(new InetSocketAddress("127.0.0.1", 65534)))
124-
.build();
152+
.addContactPointsWithPorts(Collections.singleton(new InetSocketAddress("127.0.0.1", 65534)))
153+
.build();
125154
try {
126155
cluster.connect();
127156
fail("Should not have been able to connect.");
128-
} catch(NoHostAvailableException e) {} // Expected.
157+
} catch (NoHostAvailableException e) {
158+
} // Expected.
129159
CloseFuture closeFuture = cluster.closeAsync();
130160
try {
131161
closeFuture.get(1, TimeUnit.SECONDS);
132-
} catch(TimeoutException e) {
162+
} catch (TimeoutException e) {
133163
fail("Close Future did not complete quickly.");
134164
}
135165
}
136166

137-
private void fakePeerRowsInNode1() {
138-
Cluster cluster = null;
139-
try {
140-
cluster = Cluster.builder().addContactPoint(CCMBridge.ipOfNode(1)).build();
141-
Session session = cluster.connect("system");
142-
143-
String releaseVersion = session.execute("SELECT release_version FROM local")
144-
.one().getString("release_version");
145-
146-
for (int i = 2; i <= 6; i++) {
147-
Insert insertStmt = insertInto("peers")
148-
.value("peer", InetAddress.getByName(CCMBridge.ipOfNode(i)))
149-
.value("data_center", "datacenter1")
150-
.value("host_id", UUIDs.random())
151-
.value("rack", "rack1")
152-
.value("release_version", releaseVersion)
153-
.value("rpc_address", InetAddress.getByName(CCMBridge.ipOfNode(i)))
154-
.value("schema_version", UUIDs.random());
155-
session.execute(insertStmt);
156-
}
157-
} catch (Exception e) {
158-
fail("Error while inserting fake peer rows", e);
159-
} finally {
160-
if (cluster != null)
161-
cluster.close();
167+
private void primePeerRows(Scassandra scassandra, List<FakeHost> otherHosts) throws UnknownHostException {
168+
PrimingClient primingClient = PrimingClient.builder()
169+
.withHost("localhost").withPort(scassandra.getAdminPort())
170+
.build();
171+
172+
List<Map<String, ?>> rows = Lists.newArrayListWithCapacity(5);
173+
174+
int i = 0;
175+
for (FakeHost otherHost : otherHosts) {
176+
InetAddress address = InetAddress.getByName(otherHost.address);
177+
rows.add(ImmutableMap.<String, Object>builder()
178+
.put("peer", address)
179+
.put("rpc_address", address)
180+
.put("data_center", "datacenter1")
181+
.put("rack", "rack1")
182+
.put("release_version", "2.0.1")
183+
.put("tokens", ImmutableSet.of(Long.toString(Long.MIN_VALUE + i++)))
184+
.build());
162185
}
186+
187+
primingClient.prime(
188+
PrimingRequest.queryBuilder()
189+
.withQuery("SELECT * FROM system.peers")
190+
.withColumnTypes(SCassandraCluster.SELECT_PEERS_COLUMN_TYPES)
191+
.withRows(rows)
192+
.build());
163193
}
164194
}

driver-core/src/test/java/com/datastax/driver/core/FakeHost.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
public class FakeHost {
3232
public enum Behavior {THROWING_CONNECT_TIMEOUTS, THROWING_OPERATION_TIMEOUTS}
3333

34-
private final String address;
34+
final String address;
3535
private final int port;
3636
private final Behavior behavior;
3737
private final ExecutorService executor;

driver-core/src/test/java/com/datastax/driver/core/SCassandraCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private void primePeers(PrimingClient primingClient, Scassandra toIgnore) {
134134
.build());
135135
}
136136

137-
private static final ImmutableMap<String, ColumnTypes> SELECT_PEERS_COLUMN_TYPES =
137+
static final ImmutableMap<String, ColumnTypes> SELECT_PEERS_COLUMN_TYPES =
138138
ImmutableMap.<String, ColumnTypes>builder()
139139
.put("peer", ColumnTypes.Inet)
140140
.put("rpc_address", ColumnTypes.Inet)

0 commit comments

Comments
 (0)