Skip to content

Commit ec0b124

Browse files
author
Alexandre Dutra
committed
JAVA-1431: Improve error handling during pool initialization
This commit also adds tests for errors during pool initialization using the byteman library.
1 parent 24723df commit ec0b124

11 files changed

Lines changed: 244 additions & 13 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
- [improvement] JAVA-1458: Check thread in mapper sync methods.
1818
- [improvement] JAVA-1488: Upgrade Netty to 4.0.47.Final.
1919
- [improvement] JAVA-1460: Add speculative execution number to ExecutionInfo
20+
- [improvement] JAVA-1431: Improve error handling during pool initialization.
2021

2122

2223
### 3.2.0

driver-core/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,19 @@
142142
<scope>test</scope>
143143
</dependency>
144144

145+
<dependency>
146+
<groupId>org.jboss.byteman</groupId>
147+
<artifactId>byteman-bmunit</artifactId>
148+
<scope>test</scope>
149+
<version>${byteman.version}</version>
150+
<exclusions>
151+
<exclusion>
152+
<groupId>org.testng</groupId>
153+
<artifactId>testng</artifactId>
154+
</exclusion>
155+
</exclusions>
156+
</dependency>
157+
145158
</dependencies>
146159

147160
<build>

driver-core/src/main/java/com/datastax/driver/core/CloseFuture.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public CloseFuture force() {
5151
* Try to force the completion of the shutdown this is a future of.
5252
* <p/>
5353
* This method will do its best to expedite the shutdown process. In
54-
* particular, all connections will be closed right away, even if there is
54+
* particular, all connections will be closed right away, even if there are
5555
* ongoing queries at the time this method is called.
5656
* <p/>
5757
* Note that this method does not block. The completion of this method does

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,8 @@ public ListenableFuture<Void> apply(Throwable t) throws Exception {
193193
future.setException(t);
194194
} else {
195195
// Defunct to ensure that the error will be signaled (marking the host down)
196-
Exception e = (t instanceof ConnectionException || t instanceof DriverException || t instanceof InterruptedException)
197-
? (Exception) t
196+
Throwable e = (t instanceof ConnectionException || t instanceof DriverException || t instanceof InterruptedException || t instanceof Error)
197+
? t
198198
: new ConnectionException(Connection.this.address,
199199
String.format("Unexpected error during transport initialization (%s)", t),
200200
t);
@@ -422,7 +422,7 @@ int maxAvailableStreams() {
422422
return dispatcher.streamIdHandler.maxAvailableStreams();
423423
}
424424

425-
<E extends Exception> E defunct(E e) {
425+
<E extends Throwable> E defunct(E e) {
426426
if (isDefunct.compareAndSet(false, true)) {
427427

428428
if (Host.statesLogger.isTraceEnabled())
@@ -846,6 +846,8 @@ static RuntimeException launderAsyncInitException(ExecutionException e) throws C
846846
throw (ClusterNameMismatchException) t;
847847
if (t instanceof DriverException)
848848
throw (DriverException) t;
849+
if (t instanceof Error)
850+
throw (Error) t;
849851

850852
return new RuntimeException("Unexpected exception during connection initialization", t);
851853
}

driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,14 @@ public ListenableFuture<Void> apply(Throwable t) throws Exception {
174174
// accordingly in SessionManager#maybeAddPool.
175175
Throwables.propagateIfInstanceOf(t, ClusterNameMismatchException.class);
176176
Throwables.propagateIfInstanceOf(t, UnsupportedProtocolVersionException.class);
177+
Throwables.propagateIfInstanceOf(t, AuthenticationException.class);
177178

178179
// We don't want to swallow Errors either as they probably indicate a more serious issue (OOME...)
179180
Throwables.propagateIfInstanceOf(t, Error.class);
180181

181-
// Otherwise, return success. The pool will simply ignore this connection when it sees that it's been closed.
182+
// Otherwise, log the exception but return success.
183+
// The pool will simply ignore this connection when it sees that it's been closed.
184+
logger.warn("Error creating connection to " + host, t);
182185
return MoreFutures.VOID_SUCCESS;
183186
}
184187
}, executor);

driver-core/src/main/java/com/datastax/driver/core/SessionManager.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,16 @@ public void onFailure(Throwable t) {
376376
cluster.manager.triggerOnDown(host, false);
377377
} else {
378378
logger.warn("Error creating pool to " + host, t);
379+
// do not mark the host down, as there could be other connections to it
380+
// (e.g. the control connection, or another session pool).
381+
// The conviction policy will mark it down if it has no more active connections.
379382
}
380-
future.set(false);
383+
// propagate errors; for all other exceptions, consider the pool init failed
384+
// but allow the session init process to continue normally
385+
if (t instanceof Error)
386+
future.setException(t);
387+
else
388+
future.set(false);
381389
}
382390
});
383391
return future;
@@ -431,7 +439,7 @@ ListenableFuture<?> updateCreatedPools() {
431439
}
432440

433441
// Wait pool creation before removing, so we don't lose connectivity
434-
ListenableFuture<?> allPoolsCreatedFuture = Futures.successfulAsList(poolCreatedFutures);
442+
ListenableFuture<?> allPoolsCreatedFuture = Futures.allAsList(poolCreatedFutures);
435443

436444
return GuavaCompatibility.INSTANCE.transformAsync(allPoolsCreatedFuture, new AsyncFunction<Object, List<Void>>() {
437445
@Override
@@ -452,11 +460,7 @@ void updateCreatedPools(Host h) {
452460
try {
453461
if (pool == null) {
454462
if (dist != HostDistance.IGNORED && h.state == Host.State.UP)
455-
try {
456-
maybeAddPool(h, null).get();
457-
} catch (ExecutionException e) {
458-
// Ignore, maybeAddPool has already handled the error
459-
}
463+
maybeAddPool(h, null).get();
460464
} else if (dist != pool.hostDistance) {
461465
if (dist == HostDistance.IGNORED) {
462466
removePool(h).get();
@@ -468,7 +472,10 @@ void updateCreatedPools(Host h) {
468472
} catch (InterruptedException e) {
469473
Thread.currentThread().interrupt();
470474
} catch (ExecutionException e) {
471-
logger.error("Unexpected error while refreshing connection pools", e.getCause());
475+
Throwable cause = e.getCause();
476+
logger.error("Unexpected error while refreshing connection pools", cause);
477+
if (cause instanceof Error)
478+
throw ((Error) cause);
472479
}
473480
}
474481

driver-core/src/test/java/com/datastax/driver/core/AuthenticationTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.datastax.driver.core.exceptions.AuthenticationException;
1919
import com.google.common.util.concurrent.Uninterruptibles;
20+
import org.apache.log4j.Level;
2021
import org.testng.annotations.BeforeMethod;
2122
import org.testng.annotations.Test;
2223

@@ -132,4 +133,38 @@ public void run() {
132133
}, 2000);
133134
}
134135

136+
/**
137+
* Ensures that when a host replies with AuthenticationException
138+
* during connection pool initialization the pool creation is aborted.
139+
*
140+
* @jira_ticket JAVA-1431
141+
*/
142+
@Test(groups = "short")
143+
public void should_not_create_pool_with_wrong_credentials() throws InterruptedException {
144+
PlainTextAuthProvider authProvider = new PlainTextAuthProvider("cassandra", "cassandra");
145+
Cluster cluster = register(Cluster.builder()
146+
.addContactPoints(getContactPoints())
147+
.withPort(ccm().getBinaryPort())
148+
.withAuthProvider(authProvider)
149+
.build());
150+
cluster.init();
151+
authProvider.setPassword("wrong");
152+
Level previous = TestUtils.setLogLevel(Session.class, Level.WARN);
153+
MemoryAppender logs = new MemoryAppender().enableFor(Session.class);
154+
Session session;
155+
try {
156+
session = cluster.connect();
157+
} finally {
158+
TestUtils.setLogLevel(Session.class, previous);
159+
logs.disableFor(Session.class);
160+
}
161+
assertThat(session.getState().getConnectedHosts()).isEmpty();
162+
InetSocketAddress host = ccm().addressOfNode(1);
163+
assertThat(logs.get())
164+
.contains(
165+
"Error creating pool to " + host,
166+
"Authentication error on host " + host,
167+
AuthenticationException.class.getSimpleName());
168+
}
169+
135170
}

driver-core/src/test/java/com/datastax/driver/core/MemoryAppender.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.driver.core;
1717

18+
import org.apache.log4j.Logger;
1819
import org.apache.log4j.PatternLayout;
1920
import org.apache.log4j.WriterAppender;
2021
import org.apache.log4j.spi.LoggingEvent;
@@ -95,4 +96,49 @@ public String getNext() {
9596
nextLogIdx += next.length();
9697
return next;
9798
}
99+
100+
public MemoryAppender enableFor(Class<?>... loggers) {
101+
for (Class<?> logger : loggers) {
102+
enableFor(logger.getName());
103+
}
104+
return this;
105+
}
106+
107+
public MemoryAppender enableFor(org.slf4j.Logger... loggers) {
108+
for (org.slf4j.Logger logger : loggers) {
109+
enableFor(logger.getName());
110+
}
111+
return this;
112+
}
113+
114+
public MemoryAppender enableFor(String... loggers) {
115+
for (String logger : loggers) {
116+
Logger log4jLogger = Logger.getLogger(logger);
117+
log4jLogger.addAppender(this);
118+
}
119+
return this;
120+
}
121+
122+
public MemoryAppender disableFor(Class<?>... loggers) {
123+
for (Class<?> logger : loggers) {
124+
disableFor(logger.getName());
125+
}
126+
return this;
127+
}
128+
129+
public MemoryAppender disableFor(org.slf4j.Logger... loggers) {
130+
for (org.slf4j.Logger logger : loggers) {
131+
disableFor(logger.getName());
132+
}
133+
return this;
134+
}
135+
136+
public MemoryAppender disableFor(String... loggers) {
137+
for (String logger : loggers) {
138+
Logger log4jLogger = Logger.getLogger(logger);
139+
log4jLogger.removeAppender(this);
140+
}
141+
return this;
142+
}
143+
98144
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright (C) 2012-2017 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.driver.core;
17+
18+
import org.apache.log4j.Level;
19+
import org.jboss.byteman.contrib.bmunit.BMNGListener;
20+
import org.jboss.byteman.contrib.bmunit.BMRule;
21+
import org.jboss.byteman.contrib.bmunit.BMUnitConfig;
22+
import org.testng.annotations.AfterClass;
23+
import org.testng.annotations.BeforeClass;
24+
import org.testng.annotations.Listeners;
25+
import org.testng.annotations.Test;
26+
27+
import static com.datastax.driver.core.Cluster.builder;
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
import static org.assertj.core.api.Assertions.fail;
30+
31+
/**
32+
* Simple test of the Sessions methods against a one node cluster.
33+
*/
34+
@BMUnitConfig(loadDirectory = "target/test-classes")
35+
@Listeners(BMNGListener.class)
36+
@CCMConfig(createCluster = false)
37+
public class SessionErrorTest extends ScassandraTestBase {
38+
39+
private ScassandraCluster scassandra;
40+
private Cluster cluster;
41+
42+
@BeforeClass(groups = "short")
43+
public void setUp() throws Exception {
44+
scassandra = ScassandraCluster.builder().withNodes(2).build();
45+
scassandra.init();
46+
cluster = builder()
47+
.addContactPoints(scassandra.address(1).getAddress())
48+
.withPort(scassandra.getBinaryPort())
49+
.build();
50+
cluster.init();
51+
}
52+
53+
@AfterClass(groups = "short")
54+
public void tearDown() throws Exception {
55+
cluster.close();
56+
scassandra.stop();
57+
}
58+
59+
@Test(groups = "short")
60+
@BMRule(name = "emulate OOME",
61+
targetClass = "com.datastax.driver.core.Connection$4",
62+
targetMethod = "apply(Void)",
63+
action = "throw new OutOfMemoryError(\"not really\")"
64+
)
65+
public void should_propagate_errors() {
66+
try {
67+
cluster.connect();
68+
fail("Expecting OOME");
69+
} catch (OutOfMemoryError e) {
70+
assertThat(e).hasMessage("not really");
71+
}
72+
}
73+
74+
@Test(groups = "short")
75+
@BMRule(name = "emulate NPE",
76+
targetClass = "com.datastax.driver.core.Connection$4",
77+
targetMethod = "apply(Void)",
78+
action = "throw new NullPointerException(\"not really\")"
79+
)
80+
public void should_not_propagate_unchecked_exceptions() {
81+
Level previous = TestUtils.setLogLevel(HostConnectionPool.class, Level.WARN);
82+
MemoryAppender logs = new MemoryAppender().enableFor(HostConnectionPool.class);
83+
try {
84+
Session session = cluster.connect();
85+
// Pool to host1 should be still open because host1 is the control host,
86+
// but its pool should have no active connection
87+
// Pool to host2 should have been closed because host2 has no
88+
// more active connections
89+
Session.State state = session.getState();
90+
Host host1 = scassandra.host(cluster, 1, 1);
91+
Host host2 = scassandra.host(cluster, 1, 2);
92+
assertThat(state.getConnectedHosts()).hasSize(1).containsExactly(host1);
93+
assertThat(state.getOpenConnections(host1)).isEqualTo(0); // pool open but empty
94+
assertThat(state.getOpenConnections(host2)).isEqualTo(0); // pool closed
95+
assertThat(logs.get())
96+
.contains(
97+
"Unexpected error during transport initialization",
98+
"not really",
99+
NullPointerException.class.getSimpleName(),
100+
"com.datastax.driver.core.Connection$4.apply");
101+
} finally {
102+
TestUtils.setLogLevel(HostConnectionPool.class, previous);
103+
logs.disableFor(HostConnectionPool.class);
104+
}
105+
}
106+
107+
}

driver-core/src/test/java/com/datastax/driver/core/TestUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.util.concurrent.Uninterruptibles;
2525
import com.sun.management.OperatingSystemMXBean;
2626
import io.netty.channel.EventLoopGroup;
27+
import org.apache.log4j.Level;
2728
import org.scassandra.Scassandra;
2829
import org.scassandra.ScassandraFactory;
2930
import org.slf4j.Logger;
@@ -916,4 +917,19 @@ public static ByteBuffer serializeForCompositeType(Object... params) {
916917
return res;
917918
}
918919

920+
public static Level setLogLevel(Class<?> logger, Level newLevel) {
921+
return setLogLevel(logger.getName(), newLevel);
922+
}
923+
924+
public static Level setLogLevel(Logger logger, Level newLevel) {
925+
return setLogLevel(logger.getName(), newLevel);
926+
}
927+
928+
public static Level setLogLevel(String logger, Level newLevel) {
929+
org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(logger);
930+
Level oldLevel = log4jLogger.getLevel();
931+
log4jLogger.setLevel(newLevel);
932+
return oldLevel;
933+
}
934+
919935
}

0 commit comments

Comments
 (0)