|
20 | 20 | import com.google.common.base.Throwables; |
21 | 21 | import com.google.common.collect.Lists; |
22 | 22 | import com.google.common.util.concurrent.*; |
| 23 | +import org.slf4j.Logger; |
| 24 | +import org.slf4j.LoggerFactory; |
23 | 25 | import org.testng.annotations.DataProvider; |
24 | 26 | import org.testng.annotations.Test; |
25 | 27 |
|
|
34 | 36 |
|
35 | 37 | public class AsyncQueryTest extends CCMBridge.PerClassSingleNodeCluster { |
36 | 38 |
|
| 39 | + Logger logger = LoggerFactory.getLogger(AsyncQueryTest.class); |
| 40 | + |
37 | 41 | @DataProvider(name = "keyspace") |
38 | 42 | public static Object[][] keyspace() { |
39 | 43 | return new Object[][]{{"asyncquerytest"}, {"\"AsyncQueryTest\""}}; |
@@ -133,22 +137,34 @@ public void should_propagate_error_to_chained_query_if_session_init_fails() thro |
133 | 137 |
|
134 | 138 | @Test(groups = "short") |
135 | 139 | public void should_fail_when_synchronous_call_on_io_thread() throws Exception { |
136 | | - ResultSetFuture f = session.executeAsync("select release_version from system.local"); |
137 | | - ListenableFuture<Void> f2 = Futures.transform(f, new Function<ResultSet, Void>() { |
138 | | - @Override |
139 | | - public Void apply(ResultSet input) { |
140 | | - session.execute("select release_version from system.local"); |
141 | | - return null; |
| 140 | + final Thread sameThread = Thread.currentThread(); |
| 141 | + for (int i = 0; i < 1000; i++) { |
| 142 | + ResultSetFuture f = session.executeAsync("select release_version from system.local"); |
| 143 | + ListenableFuture<Thread> f2 = Futures.transform(f, new Function<ResultSet, Thread>() { |
| 144 | + @Override |
| 145 | + public Thread apply(ResultSet input) { |
| 146 | + session.execute("select release_version from system.local"); |
| 147 | + return Thread.currentThread(); |
| 148 | + } |
| 149 | + }); |
| 150 | + try { |
| 151 | + Thread executedThread = f2.get(); |
| 152 | + if(executedThread != sameThread) { |
| 153 | + fail("Expected a failed future, callback was executed on " + executedThread); |
| 154 | + } else { |
| 155 | + // Callback was invoked on the same thread, which indicates that the future completed |
| 156 | + // before the transform callback was registered. Try again to produce case where callback |
| 157 | + // is called on io thread. |
| 158 | + logger.warn("Future completed before transform callback registered, will try again."); |
| 159 | + } |
| 160 | + } catch (Exception e) { |
| 161 | + assertThat(Throwables.getRootCause(e)) |
| 162 | + .isInstanceOf(IllegalStateException.class) |
| 163 | + .hasMessageContaining("Detected a synchronous Session call"); |
| 164 | + return; |
142 | 165 | } |
143 | | - }); |
144 | | - try { |
145 | | - f2.get(); |
146 | | - fail("Expected a failed future"); |
147 | | - } catch (Exception e) { |
148 | | - assertThat(Throwables.getRootCause(e)) |
149 | | - .isInstanceOf(IllegalStateException.class) |
150 | | - .hasMessageContaining("Detected a synchronous Session call"); |
151 | 166 | } |
| 167 | + fail("callback was not executed on io thread in 1000 attempts, something may be wrong."); |
152 | 168 | } |
153 | 169 |
|
154 | 170 | private ListenableFuture<Integer> connectAndQuery(String keyspace, Executor executor) { |
|
0 commit comments