|
18 | 18 | import com.codahale.metrics.Gauge; |
19 | 19 | import com.datastax.driver.core.exceptions.*; |
20 | 20 | import com.datastax.driver.core.policies.ConstantReconnectionPolicy; |
21 | | -import com.datastax.driver.core.utils.MoreFutures; |
| 21 | +import com.google.common.base.Function; |
22 | 22 | import com.google.common.base.Predicate; |
23 | 23 | import com.google.common.base.Throwables; |
24 | 24 | import com.google.common.util.concurrent.*; |
@@ -290,7 +290,7 @@ public void should_adjust_connection_keyspace_on_dequeue_if_pool_state_is_differ |
290 | 290 | int count = 0; |
291 | 291 | for (MockRequest queuedRequest : queuedRequests) { |
292 | 292 | try { |
293 | | - Uninterruptibles.getUninterruptibly(queuedRequest.connectionFuture, 5, TimeUnit.SECONDS); |
| 293 | + Uninterruptibles.getUninterruptibly(queuedRequest.connectionFuture, 10, TimeUnit.SECONDS); |
294 | 294 | count++; |
295 | 295 | } catch (ExecutionException e) { |
296 | 296 | // 128th request should timeout since all in flight requests are used. |
@@ -400,7 +400,6 @@ public void should_fail_in_dequeue_when_setting_keyspace_and_another_set_keyspac |
400 | 400 | assertThat(e.getCause()).isInstanceOf(DriverException.class); |
401 | 401 | assertThat(e.getCause().getMessage()).contains("Aborting attempt to set keyspace to 'newkeyspace' since there is already an in flight attempt to set keyspace to 'slowks'."); |
402 | 402 | } |
403 | | - |
404 | 403 | } finally { |
405 | 404 | MockRequest.completeAll(requests); |
406 | 405 | cluster.close(); |
@@ -1242,8 +1241,8 @@ public void should_wait_on_connection_if_zero_core_connections() throws Exceptio |
1242 | 1241 |
|
1243 | 1242 | // Should create up to core connections. |
1244 | 1243 | verify(factory, timeout(readTimeout).times(1)).open(any(HostConnectionPool.class)); |
1245 | | - |
1246 | 1244 | assertPoolSize(pool, 1); |
| 1245 | + Uninterruptibles.getUninterruptibly(request.requestInitialized, 10, TimeUnit.SECONDS); |
1247 | 1246 | request.simulateSuccessResponse(); |
1248 | 1247 | } finally { |
1249 | 1248 | cluster.close(); |
@@ -1348,6 +1347,9 @@ static class MockRequest implements Connection.ResponseCallback { |
1348 | 1347 | enum State {START, COMPLETED, FAILED, TIMED_OUT} |
1349 | 1348 |
|
1350 | 1349 | final ListenableFuture<Connection> connectionFuture; |
| 1350 | + |
| 1351 | + final ListenableFuture<Connection.ResponseHandler> requestInitialized; |
| 1352 | + |
1351 | 1353 | private volatile Connection.ResponseHandler responseHandler; |
1352 | 1354 |
|
1353 | 1355 | final AtomicReference<State> state = new AtomicReference<State>(State.START); |
@@ -1404,12 +1406,13 @@ private static void completeAll(List<MockRequest> requests) { |
1404 | 1406 |
|
1405 | 1407 | private MockRequest(HostConnectionPool pool, int timeoutMillis, int maxQueueSize) throws ConnectionException { |
1406 | 1408 | this.connectionFuture = pool.borrowConnection(timeoutMillis, MILLISECONDS, maxQueueSize); |
1407 | | - Futures.addCallback(this.connectionFuture, new MoreFutures.SuccessCallback<Connection>() { |
| 1409 | + requestInitialized = Futures.transform(this.connectionFuture, new Function<Connection, Connection.ResponseHandler>() { |
1408 | 1410 | @Override |
1409 | | - public void onSuccess(Connection connection) { |
| 1411 | + public Connection.ResponseHandler apply(Connection connection) { |
1410 | 1412 | MockRequest thisRequest = MockRequest.this; |
1411 | 1413 | thisRequest.responseHandler = new Connection.ResponseHandler(connection, -1, thisRequest); |
1412 | 1414 | connection.dispatcher.add(thisRequest.responseHandler); |
| 1415 | + return responseHandler; |
1413 | 1416 | } |
1414 | 1417 | }); |
1415 | 1418 | } |
|
0 commit comments