2424import com .google .common .base .Throwables ;
2525import com .google .common .collect .Lists ;
2626import com .google .common .util .concurrent .*;
27+ import io .netty .util .concurrent .EventExecutor ;
2728import org .slf4j .Logger ;
2829import org .slf4j .LoggerFactory ;
2930
@@ -57,14 +58,16 @@ class HostConnectionPool implements Connection.Owner {
5758 @ VisibleForTesting
5859 final Set <Connection > trash = new CopyOnWriteArraySet <Connection >();
5960
60- private final Queue <SettableFuture < Connection >> pendingBorrows = new ConcurrentLinkedQueue <SettableFuture < Connection > >();
61+ private final Queue <PendingBorrow > pendingBorrows = new ConcurrentLinkedQueue <PendingBorrow >();
6162 private final AtomicInteger pendingBorrowCount = new AtomicInteger ();
6263
6364 private final Runnable newConnectionTask ;
6465
6566 private final AtomicInteger scheduledForCreation = new AtomicInteger ();
6667
67- protected final AtomicReference <CloseFuture > closeFuture = new AtomicReference <CloseFuture >();
68+ private final EventExecutor timeoutsExecutor ;
69+
70+ private final AtomicReference <CloseFuture > closeFuture = new AtomicReference <CloseFuture >();
6871
6972 private enum Phase {INITIALIZING , READY , INIT_FAILED , CLOSING }
7073
@@ -93,6 +96,8 @@ public void run() {
9396 this .open = new AtomicInteger ();
9497
9598 this .minAllowedStreams = options ().getMaxRequestsPerConnection (hostDistance ) * 3 / 4 ;
99+
100+ this .timeoutsExecutor = manager .getCluster ().manager .connectionFactory .eventLoopGroup .next ();
96101 }
97102
98103 /**
@@ -190,7 +195,7 @@ private PoolingOptions options() {
190195 return manager .configuration ().getPoolingOptions ();
191196 }
192197
193- ListenableFuture <Connection > borrowConnection (int maxQueueSize ) {
198+ ListenableFuture <Connection > borrowConnection (long timeout , TimeUnit unit , int maxQueueSize ) {
194199 Phase phase = this .phase .get ();
195200 if (phase != Phase .READY )
196201 return Futures .immediateFailedFuture (new ConnectionException (host .getSocketAddress (), "Pool is " + phase ));
@@ -207,7 +212,7 @@ ListenableFuture<Connection> borrowConnection(int maxQueueSize) {
207212 manager .blockingExecutor ().submit (newConnectionTask );
208213 }
209214 }
210- return enqueue (maxQueueSize );
215+ return enqueue (timeout , unit , maxQueueSize );
211216 }
212217 }
213218
@@ -228,13 +233,13 @@ ListenableFuture<Connection> borrowConnection(int maxQueueSize) {
228233 // This might maybe happen if the number of core connections per host is 0 and a connection was trashed between
229234 // the previous check to connections and now. But in that case, the line above will have trigger the creation of
230235 // a new connection, so just wait that connection and move on
231- return enqueue (maxQueueSize );
236+ return enqueue (timeout , unit , maxQueueSize );
232237 } else {
233238 while (true ) {
234239 int inFlight = leastBusy .inFlight .get ();
235240
236241 if (inFlight >= Math .min (leastBusy .maxAvailableStreams (), options ().getMaxRequestsPerConnection (hostDistance ))) {
237- return enqueue (maxQueueSize );
242+ return enqueue (timeout , unit , maxQueueSize );
238243 }
239244
240245 if (leastBusy .inFlight .compareAndSet (inFlight , inFlight + 1 ))
@@ -264,8 +269,8 @@ ListenableFuture<Connection> borrowConnection(int maxQueueSize) {
264269 return leastBusy .setKeyspaceAsync (manager .poolsState .keyspace );
265270 }
266271
267- private ListenableFuture <Connection > enqueue (int maxQueueSize ) {
268- if (maxQueueSize == 0 ) {
272+ private ListenableFuture <Connection > enqueue (long timeout , TimeUnit unit , int maxQueueSize ) {
273+ if (timeout == 0 || maxQueueSize == 0 ) {
269274 return Futures .immediateFailedFuture (new BusyPoolException (host .getSocketAddress (), 0 ));
270275 }
271276
@@ -279,16 +284,16 @@ private ListenableFuture<Connection> enqueue(int maxQueueSize) {
279284 }
280285 }
281286
282- SettableFuture < Connection > future = SettableFuture . create ( );
283- pendingBorrows .add (future );
287+ PendingBorrow pendingBorrow = new PendingBorrow ( timeout , unit , timeoutsExecutor );
288+ pendingBorrows .add (pendingBorrow );
284289
285290 // If we raced with shutdown, make sure the future will be completed. This has no effect if it was properly
286291 // handled in closeAsync.
287292 if (phase .get () == Phase .CLOSING ) {
288- future .setException (new ConnectionException (host .getSocketAddress (), "Pool is shutdown" ));
293+ pendingBorrow .setException (new ConnectionException (host .getSocketAddress (), "Pool is shutdown" ));
289294 }
290295
291- return future ;
296+ return pendingBorrow . future ;
292297 }
293298
294299 void returnConnection (Connection connection ) {
@@ -316,7 +321,7 @@ void returnConnection(Connection connection) {
316321 }
317322
318323 // When a connection gets returned to the pool, check if there are pending borrows that can be completed with it.
319- private void dequeue (Connection connection ) {
324+ private void dequeue (final Connection connection ) {
320325 while (!pendingBorrows .isEmpty ()) {
321326
322327 // We can only reuse the connection if it's under its maximum number of inFlight requests.
@@ -333,34 +338,43 @@ private void dequeue(Connection connection) {
333338 }
334339 }
335340
336- final SettableFuture < Connection > pendingBorrow = pendingBorrows .poll ();
341+ final PendingBorrow pendingBorrow = pendingBorrows .poll ();
337342 if (pendingBorrow == null ) {
338343 // Another thread has emptied the queue since our last check, restore the count
339344 connection .inFlight .decrementAndGet ();
340345 } else {
341- totalInFlight .incrementAndGet ();
342346 pendingBorrowCount .decrementAndGet ();
343347 // Ensure that the keyspace set on the connection is the one set on the pool state, in the general case it will be.
344348 ListenableFuture <Connection > setKeyspaceFuture = connection .setKeyspaceAsync (manager .poolsState .keyspace );
345349 // Slight optimization, if the keyspace was already correct the future will be complete, so simply complete it here.
346350 if (setKeyspaceFuture .isDone ()) {
347351 try {
348- pendingBorrow .set (Uninterruptibles .getUninterruptibly (setKeyspaceFuture ));
352+ if (pendingBorrow .set (Uninterruptibles .getUninterruptibly (setKeyspaceFuture ))) {
353+ totalInFlight .incrementAndGet ();
354+ } else {
355+ connection .inFlight .decrementAndGet ();
356+ }
349357 } catch (ExecutionException e ) {
350358 pendingBorrow .setException (e .getCause ());
359+ connection .inFlight .decrementAndGet ();
351360 }
352361 } else {
353362 // Otherwise the keyspace did need to be set, tie the pendingBorrow future to the set keyspace completion.
354363 Futures .addCallback (setKeyspaceFuture , new FutureCallback <Connection >() {
355364
356365 @ Override
357366 public void onSuccess (Connection c ) {
358- pendingBorrow .set (c );
367+ if (pendingBorrow .set (c )) {
368+ totalInFlight .incrementAndGet ();
369+ } else {
370+ connection .inFlight .decrementAndGet ();
371+ }
359372 }
360373
361374 @ Override
362375 public void onFailure (Throwable t ) {
363376 pendingBorrow .setException (t );
377+ connection .inFlight .decrementAndGet ();
364378 }
365379 });
366380 }
@@ -592,7 +606,7 @@ final CloseFuture closeAsync() {
592606
593607 phase .set (Phase .CLOSING );
594608
595- for (SettableFuture < Connection > pendingBorrow : pendingBorrows ) {
609+ for (PendingBorrow pendingBorrow : pendingBorrows ) {
596610 pendingBorrow .setException (new ConnectionException (host .getSocketAddress (), "Pool is shutdown" ));
597611 }
598612
@@ -664,4 +678,31 @@ void setKeyspace(String keyspace) {
664678 this .keyspace = keyspace ;
665679 }
666680 }
681+
682+ private class PendingBorrow {
683+ final SettableFuture <Connection > future ;
684+ final Future <?> timeoutTask ;
685+
686+ PendingBorrow (final long timeout , final TimeUnit unit , EventExecutor timeoutsExecutor ) {
687+ this .future = SettableFuture .create ();
688+ this .timeoutTask = timeoutsExecutor .schedule (new Runnable () {
689+ @ Override
690+ public void run () {
691+ future .setException (
692+ new BusyPoolException (host .getSocketAddress (), timeout , unit ));
693+ }
694+ }, timeout , unit );
695+ }
696+
697+ boolean set (Connection connection ) {
698+ boolean succeeded = this .future .set (connection );
699+ this .timeoutTask .cancel (false );
700+ return succeeded ;
701+ }
702+
703+ void setException (Throwable exception ) {
704+ this .future .setException (exception );
705+ this .timeoutTask .cancel (false );
706+ }
707+ }
667708}
0 commit comments