Skip to content

Commit 251617d

Browse files
author
Alexandre Dutra
committed
Merge branch '3.0.x' into 3.1.x
2 parents cbf4095 + 3ba47ce commit 251617d

9 files changed

Lines changed: 107 additions & 7 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ Merged from 2.1 branch:
136136
### 3.0.8 (in progress)
137137

138138
- [bug] JAVA-1404: Fix min token handling in TokenRange.contains.
139+
- [bug] JAVA-1429: Prevent heartbeats until connection is fully initialized.
139140

140141

141142
### 3.0.7

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Message.Response response
10771077

10781078
@Override
10791079
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1080-
if (!isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
1080+
if (isInitialized && !isClosed() && evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
10811081
logger.debug("{} was inactive for {} seconds, sending heartbeat", Connection.this, factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds());
10821082
write(HEARTBEAT_CALLBACK);
10831083
}

driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.testng.annotations.BeforeMethod;
2121
import org.testng.annotations.Test;
2222

23+
import java.net.InetSocketAddress;
24+
import java.util.Timer;
25+
import java.util.TimerTask;
2326
import java.util.concurrent.TimeUnit;
2427

2528
import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD;
@@ -86,4 +89,47 @@ public void should_fail_to_connect_without_credentials() throws InterruptedExcep
8689
}
8790
}
8891

92+
/**
93+
* Ensures that authentication is possible even if the server is busy during
94+
* SASL handshake.
95+
*
96+
* @jira_ticket JAVA-1429
97+
*/
98+
@Test(groups = "short")
99+
@CCMConfig(dirtiesContext = true)
100+
public void should_connect_with_slow_server() throws InterruptedException {
101+
Cluster cluster = Cluster.builder()
102+
.addContactPoints(getContactPoints())
103+
.withPort(ccm().getBinaryPort())
104+
.withAuthProvider(new SlowAuthProvider())
105+
.withPoolingOptions(new PoolingOptions()
106+
.setHeartbeatIntervalSeconds(1))
107+
.build();
108+
cluster.connect();
109+
}
110+
111+
private class SlowAuthProvider extends PlainTextAuthProvider {
112+
113+
public SlowAuthProvider() {
114+
super("cassandra", "cassandra");
115+
}
116+
117+
@Override
118+
public Authenticator newAuthenticator(InetSocketAddress host, String authenticator) throws AuthenticationException {
119+
simulateBusyServer();
120+
return super.newAuthenticator(host, authenticator);
121+
}
122+
123+
}
124+
125+
private void simulateBusyServer() {
126+
ccm().pause(1);
127+
new Timer().schedule(new TimerTask() {
128+
@Override
129+
public void run() {
130+
ccm().resume(1);
131+
}
132+
}, 2000);
133+
}
134+
89135
}

driver-core/src/test/java/com/datastax/driver/core/CCMAccess.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,20 @@ enum Workload {cassandra, solr, hadoop, spark, cfs, graph}
168168
*/
169169
void forceStop(int n);
170170

171+
/**
172+
* Pauses the {@code nth} host in the CCM cluster.
173+
*
174+
* @param n the node number (starting from 1).
175+
*/
176+
void pause(int n);
177+
178+
/**
179+
* Resumes the {@code nth} host in the CCM cluster.
180+
*
181+
* @param n the node number (starting from 1).
182+
*/
183+
void resume(int n);
184+
171185
/**
172186
* Removes the {@code nth} host in the CCM cluster.
173187
*

driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,18 @@ public void forceStop(int n) {
521521
execute(CCM_COMMAND + " node%d stop --not-gently", n);
522522
}
523523

524+
@Override
525+
public void pause(int n) {
526+
logger.debug(String.format("Pausing: node %s (%s%s:%s) in %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
527+
execute(CCM_COMMAND + " node%d pause", n);
528+
}
529+
530+
@Override
531+
public void resume(int n) {
532+
logger.debug(String.format("Resuming: node %s (%s%s:%s) in %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
533+
execute(CCM_COMMAND + " node%d resume", n);
534+
}
535+
524536
@Override
525537
public void remove(int n) {
526538
logger.debug(String.format("Removing: node %s (%s%s:%s) from %s", n, TestUtils.IP_PREFIX, n, binaryPort, this));
@@ -679,13 +691,15 @@ protected void processLine(String line, int logLevel) {
679691
/**
680692
* Waits for a host to be up by pinging the TCP socket directly, without using the Java driver's API.
681693
*/
694+
@Override
682695
public void waitForUp(int node) {
683696
TestUtils.waitUntilPortIsUp(addressOfNode(node));
684697
}
685698

686699
/**
687700
* Waits for a host to be down by pinging the TCP socket directly, without using the Java driver's API.
688701
*/
702+
@Override
689703
public void waitForDown(int node) {
690704
TestUtils.waitUntilPortIsDown(addressOfNode(node));
691705
}

driver-core/src/test/java/com/datastax/driver/core/CCMCache.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,16 @@ public void forceStop(int n) {
165165
ccm.forceStop(n);
166166
}
167167

168+
@Override
169+
public void pause(int n) {
170+
ccm.pause(n);
171+
}
172+
173+
@Override
174+
public void resume(int n) {
175+
ccm.resume(n);
176+
}
177+
168178
@Override
169179
public void remove(int n) {
170180
ccm.remove(n);

driver-core/src/test/java/com/datastax/driver/core/CCMTestsSupport.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,16 @@ public void forceStop(int n) {
177177
throw new UnsupportedOperationException("This CCM cluster is read-only");
178178
}
179179

180+
@Override
181+
public void pause(int n) {
182+
throw new UnsupportedOperationException("This CCM cluster is read-only");
183+
}
184+
185+
@Override
186+
public void resume(int n) {
187+
throw new UnsupportedOperationException("This CCM cluster is read-only");
188+
}
189+
180190
@Override
181191
public void remove(int n) {
182192
throw new UnsupportedOperationException("This CCM cluster is read-only");

driver-core/src/test/java/com/datastax/driver/core/HostConnectionPoolTest.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import com.codahale.metrics.Gauge;
1919
import com.datastax.driver.core.exceptions.*;
2020
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
21-
import com.datastax.driver.core.utils.MoreFutures;
21+
import com.google.common.base.Function;
2222
import com.google.common.base.Predicate;
2323
import com.google.common.base.Throwables;
2424
import com.google.common.util.concurrent.*;
@@ -290,7 +290,7 @@ public void should_adjust_connection_keyspace_on_dequeue_if_pool_state_is_differ
290290
int count = 0;
291291
for (MockRequest queuedRequest : queuedRequests) {
292292
try {
293-
Uninterruptibles.getUninterruptibly(queuedRequest.connectionFuture, 5, TimeUnit.SECONDS);
293+
Uninterruptibles.getUninterruptibly(queuedRequest.connectionFuture, 10, TimeUnit.SECONDS);
294294
count++;
295295
} catch (ExecutionException e) {
296296
// 128th request should timeout since all in flight requests are used.
@@ -400,7 +400,6 @@ public void should_fail_in_dequeue_when_setting_keyspace_and_another_set_keyspac
400400
assertThat(e.getCause()).isInstanceOf(DriverException.class);
401401
assertThat(e.getCause().getMessage()).contains("Aborting attempt to set keyspace to 'newkeyspace' since there is already an in flight attempt to set keyspace to 'slowks'.");
402402
}
403-
404403
} finally {
405404
MockRequest.completeAll(requests);
406405
cluster.close();
@@ -1242,8 +1241,8 @@ public void should_wait_on_connection_if_zero_core_connections() throws Exceptio
12421241

12431242
// Should create up to core connections.
12441243
verify(factory, timeout(readTimeout).times(1)).open(any(HostConnectionPool.class));
1245-
12461244
assertPoolSize(pool, 1);
1245+
Uninterruptibles.getUninterruptibly(request.requestInitialized, 10, TimeUnit.SECONDS);
12471246
request.simulateSuccessResponse();
12481247
} finally {
12491248
cluster.close();
@@ -1348,6 +1347,9 @@ static class MockRequest implements Connection.ResponseCallback {
13481347
enum State {START, COMPLETED, FAILED, TIMED_OUT}
13491348

13501349
final ListenableFuture<Connection> connectionFuture;
1350+
1351+
final ListenableFuture<Connection.ResponseHandler> requestInitialized;
1352+
13511353
private volatile Connection.ResponseHandler responseHandler;
13521354

13531355
final AtomicReference<State> state = new AtomicReference<State>(State.START);
@@ -1404,12 +1406,13 @@ private static void completeAll(List<MockRequest> requests) {
14041406

14051407
private MockRequest(HostConnectionPool pool, int timeoutMillis, int maxQueueSize) throws ConnectionException {
14061408
this.connectionFuture = pool.borrowConnection(timeoutMillis, MILLISECONDS, maxQueueSize);
1407-
Futures.addCallback(this.connectionFuture, new MoreFutures.SuccessCallback<Connection>() {
1409+
requestInitialized = Futures.transform(this.connectionFuture, new Function<Connection, Connection.ResponseHandler>() {
14081410
@Override
1409-
public void onSuccess(Connection connection) {
1411+
public Connection.ResponseHandler apply(Connection connection) {
14101412
MockRequest thisRequest = MockRequest.this;
14111413
thisRequest.responseHandler = new Connection.ResponseHandler(connection, -1, thisRequest);
14121414
connection.dispatcher.add(thisRequest.responseHandler);
1415+
return responseHandler;
14131416
}
14141417
});
14151418
}

driver-mapping/src/test/java/com/datastax/driver/mapping/MapperSaveNullFieldsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.datastax.driver.core.BoundStatement;
1919
import com.datastax.driver.core.CCMTestsSupport;
20+
import com.datastax.driver.core.utils.CassandraVersion;
2021
import com.datastax.driver.mapping.Mapper.Option;
2122
import com.datastax.driver.mapping.annotations.PartitionKey;
2223
import com.datastax.driver.mapping.annotations.Table;
@@ -40,6 +41,7 @@ public void setup() {
4041
mapper = new MappingManager(session()).mapper(User.class);
4142
}
4243

44+
@CassandraVersion("2.1.0")
4345
@Test(groups = "short")
4446
void should_save_null_fields_if_requested() {
4547
should_save_null_fields(true, Option.saveNullFields(true));

0 commit comments

Comments
 (0)