Skip to content

Commit b430b15

Browse files
tolbertamolim7t
authored andcommitted
Repeat test until callback invoked on IO thread. (apache#669)
1 parent 7edb4d7 commit b430b15

1 file changed

Lines changed: 30 additions & 14 deletions

File tree

driver-core/src/test/java/com/datastax/driver/core/AsyncQueryTest.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.google.common.base.Throwables;
2121
import com.google.common.collect.Lists;
2222
import com.google.common.util.concurrent.*;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2325
import org.testng.annotations.DataProvider;
2426
import org.testng.annotations.Test;
2527

@@ -34,6 +36,8 @@
3436

3537
public class AsyncQueryTest extends CCMBridge.PerClassSingleNodeCluster {
3638

39+
Logger logger = LoggerFactory.getLogger(AsyncQueryTest.class);
40+
3741
@DataProvider(name = "keyspace")
3842
public static Object[][] keyspace() {
3943
return new Object[][]{{"asyncquerytest"}, {"\"AsyncQueryTest\""}};
@@ -133,22 +137,34 @@ public void should_propagate_error_to_chained_query_if_session_init_fails() thro
133137

134138
@Test(groups = "short")
135139
public void should_fail_when_synchronous_call_on_io_thread() throws Exception {
136-
ResultSetFuture f = session.executeAsync("select release_version from system.local");
137-
ListenableFuture<Void> f2 = Futures.transform(f, new Function<ResultSet, Void>() {
138-
@Override
139-
public Void apply(ResultSet input) {
140-
session.execute("select release_version from system.local");
141-
return null;
140+
final Thread sameThread = Thread.currentThread();
141+
for (int i = 0; i < 1000; i++) {
142+
ResultSetFuture f = session.executeAsync("select release_version from system.local");
143+
ListenableFuture<Thread> f2 = Futures.transform(f, new Function<ResultSet, Thread>() {
144+
@Override
145+
public Thread apply(ResultSet input) {
146+
session.execute("select release_version from system.local");
147+
return Thread.currentThread();
148+
}
149+
});
150+
try {
151+
Thread executedThread = f2.get();
152+
if(executedThread != sameThread) {
153+
fail("Expected a failed future, callback was executed on " + executedThread);
154+
} else {
155+
// Callback was invoked on the same thread, which indicates that the future completed
156+
// before the transform callback was registered. Try again to produce case where callback
157+
// is called on io thread.
158+
logger.warn("Future completed before transform callback registered, will try again.");
159+
}
160+
} catch (Exception e) {
161+
assertThat(Throwables.getRootCause(e))
162+
.isInstanceOf(IllegalStateException.class)
163+
.hasMessageContaining("Detected a synchronous Session call");
164+
return;
142165
}
143-
});
144-
try {
145-
f2.get();
146-
fail("Expected a failed future");
147-
} catch (Exception e) {
148-
assertThat(Throwables.getRootCause(e))
149-
.isInstanceOf(IllegalStateException.class)
150-
.hasMessageContaining("Detected a synchronous Session call");
151166
}
167+
fail("callback was not executed on io thread in 1000 attempts, something may be wrong.");
152168
}
153169

154170
private ListenableFuture<Integer> connectAndQuery(String keyspace, Executor executor) {

0 commit comments

Comments
 (0)