Skip to content

Commit efbf9bf

Browse files
committed
Merge pull request apache#513 from datastax/itest-grooming
Integration Test Grooming
2 parents cd294d9 + d2d1f0e commit efbf9bf

11 files changed

Lines changed: 155 additions & 55 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/AbstractReconnectionHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.concurrent.*;
1919
import java.util.concurrent.atomic.AtomicReference;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.util.concurrent.ListenableFuture;
2223
import com.google.common.util.concurrent.AbstractFuture;
2324
import org.slf4j.Logger;
@@ -183,7 +184,8 @@ private void reschedule(long nextDelay) {
183184
}
184185

185186
// The future that the handler exposes to its clients via currentAttempt
186-
private static class HandlerFuture extends AbstractFuture<Void> {
187+
@VisibleForTesting
188+
static class HandlerFuture extends AbstractFuture<Void> {
187189
// A future representing completion of the next task submitted to the executor
188190
volatile ScheduledFuture<?> nextTry;
189191

driver-core/src/test/java/com/datastax/driver/core/AbstractReconnectionHandlerTest.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,20 @@
2525
import com.google.common.util.concurrent.Futures;
2626
import com.google.common.util.concurrent.ListenableFuture;
2727
import com.google.common.util.concurrent.SettableFuture;
28+
import com.google.common.util.concurrent.Uninterruptibles;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031
import org.testng.annotations.AfterMethod;
3132
import org.testng.annotations.BeforeMethod;
3233
import org.testng.annotations.Test;
3334

3435
import static org.assertj.core.api.Assertions.assertThat;
36+
import static org.mockito.Mockito.spy;
37+
import static org.mockito.Mockito.timeout;
38+
import static org.mockito.Mockito.verify;
3539
import static org.testng.Assert.fail;
3640

41+
import com.datastax.driver.core.AbstractReconnectionHandler.HandlerFuture;
3742
import com.datastax.driver.core.AbstractReconnectionHandlerTest.MockReconnectionWork.ReconnectBehavior;
3843
import com.datastax.driver.core.policies.ReconnectionPolicy.ReconnectionSchedule;
3944

@@ -48,7 +53,7 @@ public class AbstractReconnectionHandlerTest {
4853

4954
@BeforeMethod(groups = {"unit", "short"})
5055
public void setup() {
51-
executor = Executors.newScheduledThreadPool(2);
56+
executor = spy(Executors.newScheduledThreadPool(2));
5257
schedule = new MockReconnectionSchedule();
5358
work = new MockReconnectionWork();
5459
future.set(null);
@@ -133,6 +138,9 @@ public void should_stop_if_cancelled_before_first_attempt() {
133138
public void should_stop_if_cancelled_between_attempts() {
134139
handler.start();
135140

141+
// Wait for the initial schedule of a reconnect.
142+
verify(executor, timeout(1000)).schedule(handler, 0, TimeUnit.MILLISECONDS);
143+
136144
// Force a failed reconnect.
137145
schedule.tick();
138146
work.nextReconnect = ReconnectBehavior.THROW_EXCEPTION;
@@ -143,17 +151,30 @@ public void should_stop_if_cancelled_between_attempts() {
143151
schedule.delay = 3000;
144152
schedule.tick();
145153

146-
// At this point the reconnect should be scheduled but not executed yet, cancel the future.
154+
// Ensure reconnect is scheduled (slight timing window after handling failed reconnect
155+
// and scheduling next reconnect).
156+
verify(executor, timeout(1000)).schedule(handler, schedule.delay, TimeUnit.MILLISECONDS);
157+
158+
// Add a short delay to account for nextTry being assigned after schedule completes.
159+
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS);
147160
future.get().cancel(false);
148161

149-
// Should block until scheduler runs, detects cancel and exits.
162+
// Should immediately return as the future was cancelled while the task was scheduled.
150163
waitForCompletion();
151164

152165
assertThat(work.success).isFalse();
153166
// Should have had 1 failed attempt, no second attempt since cancelled.
154167
assertThat(work.tries).isEqualTo(1);
155-
// The future will be marked null once it is detected as cancelled.
156-
assertThat(future.get()).isNull();
168+
169+
// The future will be marked cancelled and thus not executed.
170+
ListenableFuture<?> currentAttempt = future.get();
171+
assertThat(currentAttempt).isInstanceOf(HandlerFuture.class);
172+
HandlerFuture handlerFuture = (HandlerFuture)currentAttempt;
173+
assertThat(handlerFuture.isCancelled());
174+
175+
// The next try should also be cancelled.
176+
assertThat(handlerFuture.nextTry).isNotNull();
177+
assertThat(handlerFuture.nextTry.isCancelled());
157178
}
158179

159180
@Test(groups = "unit")

driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -506,19 +506,30 @@ private void initKeyspace() {
506506

507507
private void clearSimpleKeyspace() {
508508
if(keyspace != null) {
509-
// Temporarily extend read timeout to 1 minute to accommodate keyspaces with many tables.
510-
// This should be more than enough although some tests create many tables, so dropping a keyspace
511-
// could take as long as 12 seconds on restricted hardware.
509+
logger.debug("Removing keyspace {}.", keyspace);
510+
// Temporarily extend read timeout to 1 minute to accommodate dropping keyspaces and
511+
// tables being slow, particularly in a CI environment.
512512
int currentTimeout = cluster.getConfiguration().getSocketOptions().getReadTimeoutMillis();
513513
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(60000);
514514
try {
515+
KeyspaceMetadata ksm = cluster.getMetadata().getKeyspace(keyspace);
516+
if (ksm != null) {
517+
// drop each table individually as this seems to be more dependable than dropping
518+
// the entire keyspace at once if it has many tables.
519+
if(ksm.getTables().size() > 10) {
520+
for (TableMetadata table : ksm.getTables()) {
521+
logger.debug("Dropping table {}.{}.", keyspace, table.getName());
522+
session.execute("DROP TABLE " + keyspace + "." + table.getName());
523+
}
524+
}
525+
}
526+
logger.debug("Dropping keyspace {}.", keyspace);
515527
session.execute("DROP KEYSPACE " + keyspace);
516528
} finally {
517529
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(currentTimeout);
518530
}
519531
}
520532
}
521-
522533
}
523534

524535
public static class CCMCluster {

driver-core/src/test/java/com/datastax/driver/core/ClusterAssert.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
*/
1616
package com.datastax.driver.core;
1717

18-
import com.google.common.collect.Iterators;
19-
import org.assertj.core.api.AbstractAssert;
20-
2118
import java.util.Iterator;
2219
import java.util.Set;
2320
import java.util.TreeSet;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import com.google.common.collect.Iterators;
24+
import org.assertj.core.api.AbstractAssert;
2425

2526
import static org.assertj.core.api.Assertions.assertThat;
2627

@@ -47,19 +48,17 @@ public ClusterAssert hasOpenControlConnection() {
4748
}
4849

4950
public HostAssert host(int hostNumber) {
50-
// TODO at some point this won't work anymore if we have assertions that wait for a node to
51-
// join the cluster, e.g. assertThat(cluster).node(3).comesUp().
52-
return new HostAssert(
53-
TestUtils.findHost(actual, hostNumber),
54-
actual);
51+
// Wait for the node to be added if it's not already known.
52+
// In 2.2+ C* does not send an added event until the node is ready so we wait a long time.
53+
Host host = TestUtils.findOrWaitForHost(actual, hostNumber,
54+
60 + Cluster.NEW_NODE_DELAY_SECONDS, TimeUnit.SECONDS);
55+
return new HostAssert(host, actual);
5556
}
5657

5758
public HostAssert host(String hostAddress) {
58-
// TODO at some point this won't work anymore if we have assertions that wait for a node to
59-
// join the cluster, e.g. assertThat(cluster).node(3).comesUp().
60-
return new HostAssert(
61-
TestUtils.findHost(actual, hostAddress),
62-
actual);
59+
Host host = TestUtils.findOrWaitForHost(actual, hostAddress,
60+
60 + Cluster.NEW_NODE_DELAY_SECONDS, TimeUnit.SECONDS);
61+
return new HostAssert(host, actual);
6362
}
6463

6564
/**

driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.common.collect.ImmutableMap;
2828
import com.google.common.collect.ImmutableSet;
2929
import com.google.common.collect.Lists;
30+
import com.google.common.util.concurrent.Uninterruptibles;
3031
import org.scassandra.Scassandra;
3132
import org.scassandra.http.client.PrimingClient;
3233
import org.scassandra.http.client.PrimingRequest;
@@ -41,14 +42,13 @@
4142

4243
import com.datastax.driver.core.exceptions.NoHostAvailableException;
4344
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
44-
import com.datastax.driver.core.utils.CassandraVersion;
4545

46-
import static com.datastax.driver.core.Assertions.*;
46+
import static com.datastax.driver.core.Assertions.assertThat;
47+
import static com.datastax.driver.core.Assertions.fail;
4748
import static com.datastax.driver.core.FakeHost.Behavior.THROWING_CONNECT_TIMEOUTS;
48-
import static com.datastax.driver.core.Host.State.DOWN;
49-
import static com.datastax.driver.core.Host.State.UP;
5049
import static com.datastax.driver.core.HostDistance.LOCAL;
5150
import static com.datastax.driver.core.TestUtils.nonDebouncingQueryOptions;
51+
import static com.datastax.driver.core.TestUtils.nonQuietClusterCloseOptions;
5252

5353
public class ClusterInitTest {
5454
private static final Logger logger = LoggerFactory.getLogger(ClusterInitTest.class);
@@ -58,7 +58,6 @@ public class ClusterInitTest {
5858
* causing timeouts, we want to ensure that the driver does not wait multiple times on the same host.
5959
*/
6060
@Test(groups = "short")
61-
@CassandraVersion(major=2.0, description = "Scassandra currently broken with protocol version 1.")
6261
public void should_handle_failing_or_missing_contact_points() throws UnknownHostException {
6362
Cluster cluster = null;
6463
Scassandra scassandra = null;
@@ -128,9 +127,20 @@ public void should_handle_failing_or_missing_contact_points() throws UnknownHost
128127
verify(socketOptions, atLeast(6)).getKeepAlive();
129128
verify(socketOptions, atMost(7)).getKeepAlive();
130129

131-
assertThat(cluster).host(realHostAddress).isNotNull().hasState(UP);
130+
assertThat(cluster).host(realHostAddress).isNotNull().isUp();
131+
// It is likely but not guaranteed that a host is marked down at this point.
132+
// It should eventually be marked down as Cluster.Manager.triggerOnDown should be
133+
// called and submit a task that marks the host down.
132134
for (FakeHost failingHost : failingHosts) {
133-
assertThat(cluster).host(failingHost.address).hasState(DOWN);
135+
assertThat(cluster).host(failingHost.address).goesDownWithin(10, TimeUnit.SECONDS);
136+
Host host = TestUtils.findHost(cluster, failingHost.address);
137+
// There is a possible race here in that the host is marked down in a separate Executor in onDown
138+
// and then starts a periodic reconnection attempt shortly after. Since setDown is called before
139+
// startPeriodicReconnectionAttempt, we add a slight delay here if the future isn't set yet.
140+
if(host.getReconnectionAttemptFuture() == null || host.getReconnectionAttemptFuture().isDone()) {
141+
logger.warn("Periodic Reconnection Attempt hasn't started yet for {}, waiting 1 second and then checking.", host);
142+
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
143+
}
134144
assertThat(cluster).host(failingHost.address).isReconnectingFromDown();
135145
}
136146
assertThat(cluster).host(missingHostAddress).isNull();
@@ -169,7 +179,9 @@ public void should_not_schedule_reconnections_before_init_complete() {
169179
try {
170180
cluster.init();
171181
} finally {
172-
assertThat(reconnectionPolicy.count.get()).isEqualTo(0);
182+
// We expect a nextDelay invocation from the ConvictionPolicy for each host, but that will
183+
// not trigger a reconnection.
184+
assertThat(reconnectionPolicy.count.get()).isEqualTo(2);
173185
for (FakeHost fakeHost : hosts) {
174186
fakeHost.stop();
175187
}
@@ -189,6 +201,7 @@ public void should_not_schedule_reconnections_before_init_complete() {
189201
public void should_be_able_to_close_cluster_that_never_successfully_connected() throws Exception {
190202
Cluster cluster = Cluster.builder()
191203
.addContactPointsWithPorts(Collections.singleton(new InetSocketAddress("127.0.0.1", 65534)))
204+
.withNettyOptions(nonQuietClusterCloseOptions)
192205
.build();
193206
try {
194207
cluster.connect();

driver-core/src/test/java/com/datastax/driver/core/HostAssert.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -120,26 +120,4 @@ public void onDown(Host host) {
120120
fail(actual + " did not go down within " + duration + " " + unit);
121121
return this;
122122
}
123-
124-
public static class StateListenerBase implements StateListener {
125-
@Override
126-
public void onAdd(Host host) {
127-
}
128-
129-
@Override
130-
public void onUp(Host host) {
131-
}
132-
133-
@Override
134-
public void onSuspected(Host host) {
135-
}
136-
137-
@Override
138-
public void onDown(Host host) {
139-
}
140-
141-
@Override
142-
public void onRemove(Host host) {
143-
}
144-
}
145123
}

driver-core/src/test/java/com/datastax/driver/core/QueryOptionsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public void beforeMethod() {
6666
public void validatePrepared(boolean expectAll) {
6767
// Prepare the statement
6868
String query = "select sansa_stark from the_known_world";
69-
session.prepare(query);
69+
PreparedStatement statement = session.prepare(query);
70+
assertThat(cluster.manager.preparedQueries).containsValue(statement);
7071

7172
// Ensure prepared properly based on expectation.
7273
List<PreparedStatementPreparation> preparationOne = scassandra.retrievePreparedStatementPreparations(1);

driver-core/src/test/java/com/datastax/driver/core/RefreshConnectedHostTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ public void should_refresh_single_connected_host() {
7878
assertThat(cluster).host(2)
7979
.hasState(State.UP)
8080
.isAtDistance(HostDistance.LOCAL);
81+
82+
// Ensure that the host is added to the Cluster.
8183
assertThat(cluster).host(3)
8284
.comesUpWithin(Cluster.NEW_NODE_DELAY_SECONDS+1, SECONDS)
8385
.isAtDistance(HostDistance.IGNORED);

driver-core/src/test/java/com/datastax/driver/core/SessionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public void run() {
290290
startLatch.countDown();
291291

292292
executor.shutdown();
293-
boolean normalShutdown = executor.awaitTermination(1, TimeUnit.SECONDS);
293+
boolean normalShutdown = executor.awaitTermination(5, TimeUnit.SECONDS);
294294
assertTrue(normalShutdown);
295295

296296
// The deadlock occurred here before JAVA-418
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (C) 2012-2015 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.driver.core;
17+
18+
public class StateListenerBase implements Host.StateListener {
19+
@Override
20+
public void onAdd(Host host) {
21+
}
22+
23+
@Override
24+
public void onUp(Host host) {
25+
}
26+
27+
@Override
28+
public void onSuspected(Host host) {
29+
30+
}
31+
32+
@Override
33+
public void onDown(Host host) {
34+
}
35+
36+
@Override
37+
public void onRemove(Host host) {
38+
}
39+
40+
}

0 commit comments

Comments
 (0)