1616package com .datastax .driver .core ;
1717
1818import com .datastax .driver .core .exceptions .AuthenticationException ;
19+ import com .datastax .driver .core .exceptions .BusyPoolException ;
1920import com .datastax .driver .core .exceptions .ConnectionException ;
2021import com .datastax .driver .core .exceptions .UnsupportedProtocolVersionException ;
2122import com .datastax .driver .core .utils .MoreFutures ;
2627import org .slf4j .Logger ;
2728import org .slf4j .LoggerFactory ;
2829
29- import java .util .ArrayList ;
30- import java .util .List ;
31- import java .util .ListIterator ;
32- import java .util .Set ;
33- import java .util .concurrent .* ;
30+ import java .util .* ;
31+ import java .util .concurrent . ConcurrentLinkedQueue ;
32+ import java .util .concurrent . CopyOnWriteArrayList ;
33+ import java .util .concurrent . CopyOnWriteArraySet ;
34+ import java .util .concurrent .Executor ;
3435import java .util .concurrent .atomic .AtomicInteger ;
3536import java .util .concurrent .atomic .AtomicReference ;
36- import java .util .concurrent .locks .Condition ;
37- import java .util .concurrent .locks .Lock ;
38- import java .util .concurrent .locks .ReentrantLock ;
3937
4038import static com .datastax .driver .core .Connection .State .*;
4139
@@ -62,9 +60,8 @@ class HostConnectionPool implements Connection.Owner {
6260 @ VisibleForTesting
6361 final Set <Connection > trash = new CopyOnWriteArraySet <Connection >();
6462
65- private volatile int waiter = 0 ;
66- private final Lock waitLock = new ReentrantLock (true );
67- private final Condition hasAvailableConnection = waitLock .newCondition ();
63+ private final Queue <SettableFuture <Connection >> pendingBorrows = new ConcurrentLinkedQueue <SettableFuture <Connection >>();
64+ private final AtomicInteger pendingBorrowCount = new AtomicInteger ();
6865
6966 private final Runnable newConnectionTask ;
7067
@@ -81,7 +78,7 @@ private enum Phase {INITIALIZING, READY, INIT_FAILED, CLOSING}
8178 // following threshold, we just replace the connection by a new one.
8279 private final int minAllowedStreams ;
8380
84- public HostConnectionPool (Host host , HostDistance hostDistance , SessionManager manager ) {
81+ HostConnectionPool (Host host , HostDistance hostDistance , SessionManager manager ) {
8582 assert hostDistance != HostDistance .IGNORED ;
8683 this .host = host ;
8784 this .hostDistance = hostDistance ;
@@ -196,17 +193,13 @@ private PoolingOptions options() {
196193 return manager .configuration ().getPoolingOptions ();
197194 }
198195
199- public Connection borrowConnection (long timeout , TimeUnit unit ) throws ConnectionException , TimeoutException {
196+ ListenableFuture < Connection > borrowConnection (int maxQueueSize ) {
200197 Phase phase = this .phase .get ();
201198 if (phase != Phase .READY )
202- // Note: throwing a ConnectionException is probably fine in practice as it will trigger the creation of a new host.
203- // That being said, maybe having a specific exception could be cleaner.
204- throw new ConnectionException (host .getSocketAddress (), "Pool is " + phase );
199+ return Futures .immediateFailedFuture (new ConnectionException (host .getSocketAddress (), "Pool is " + phase ));
205200
206201 if (connections .isEmpty ()) {
207- if (!host .convictionPolicy .canReconnectNow ())
208- throw new TimeoutException ("Connection pool is empty, currently trying to reestablish connections" );
209- else {
202+ if (host .convictionPolicy .canReconnectNow ()) {
210203 int coreSize = options ().getCoreConnectionsPerHost (hostDistance );
211204 if (coreSize == 0 ) {
212205 maybeSpawnNewConnection ();
@@ -218,10 +211,7 @@ public Connection borrowConnection(long timeout, TimeUnit unit) throws Connectio
218211 manager .blockingExecutor ().submit (newConnectionTask );
219212 }
220213 }
221- Connection c = waitForConnection (timeout , unit );
222- totalInFlight .incrementAndGet ();
223- c .setKeyspace (manager .poolsState .keyspace );
224- return c ;
214+ return enqueue (maxQueueSize );
225215 }
226216 }
227217
@@ -238,18 +228,17 @@ public Connection borrowConnection(long timeout, TimeUnit unit) throws Connectio
238228 if (leastBusy == null ) {
239229 // We could have raced with a shutdown since the last check
240230 if (isClosed ())
241- throw new ConnectionException (host .getSocketAddress (), "Pool is shutdown" );
231+ return Futures . immediateFailedFuture ( new ConnectionException (host .getSocketAddress (), "Pool is shutdown" ) );
242232 // This might maybe happen if the number of core connections per host is 0 and a connection was trashed between
243233 // the previous check to connections and now. But in that case, the line above will have trigger the creation of
244234 // a new connection, so just wait that connection and move on
245- leastBusy = waitForConnection ( timeout , unit );
235+ return enqueue ( maxQueueSize );
246236 } else {
247237 while (true ) {
248238 int inFlight = leastBusy .inFlight .get ();
249239
250240 if (inFlight >= Math .min (leastBusy .maxAvailableStreams (), options ().getMaxRequestsPerConnection (hostDistance ))) {
251- leastBusy = waitForConnection (timeout , unit );
252- break ;
241+ return enqueue (maxQueueSize );
253242 }
254243
255244 if (leastBusy .inFlight .compareAndSet (inFlight , inFlight + 1 ))
@@ -276,96 +265,37 @@ public Connection borrowConnection(long timeout, TimeUnit unit) throws Connectio
276265 maybeSpawnNewConnection ();
277266 }
278267
279- leastBusy .setKeyspace (manager .poolsState .keyspace );
280- return leastBusy ;
281- }
282-
283- private void awaitAvailableConnection (long timeout , TimeUnit unit ) throws InterruptedException {
284- waitLock .lock ();
285- waiter ++;
286- try {
287- hasAvailableConnection .await (timeout , unit );
288- } finally {
289- waiter --;
290- waitLock .unlock ();
291- }
268+ return leastBusy .setKeyspaceAsync (manager .poolsState .keyspace );
292269 }
293270
294- private void signalAvailableConnection () {
295- // Quick check if it's worth signaling to avoid locking
296- if (waiter == 0 )
297- return ;
298-
299- waitLock .lock ();
300- try {
301- hasAvailableConnection .signal ();
302- } finally {
303- waitLock .unlock ();
271+ private ListenableFuture <Connection > enqueue (int maxQueueSize ) {
272+ if (maxQueueSize == 0 ) {
273+ return Futures .immediateFailedFuture (new BusyPoolException (host .getSocketAddress (), 0 ));
304274 }
305- }
306-
307- private void signalAllAvailableConnection () {
308- // Quick check if it's worth signaling to avoid locking
309- if (waiter == 0 )
310- return ;
311275
312- waitLock .lock ();
313- try {
314- hasAvailableConnection .signalAll ();
315- } finally {
316- waitLock .unlock ();
317- }
318- }
319-
320- private Connection waitForConnection (long timeout , TimeUnit unit ) throws ConnectionException , TimeoutException {
321- if (timeout == 0 )
322- throw new TimeoutException ("All connections are busy and pool timeout is 0" );
323-
324- long start = System .nanoTime ();
325- long remaining = timeout ;
326- do {
327- try {
328- awaitAvailableConnection (remaining , unit );
329- } catch (InterruptedException e ) {
330- Thread .currentThread ().interrupt ();
331- // If we're interrupted fine, check if there is a connection available but stop waiting otherwise
332- timeout = 0 ; // this will make us stop the loop if we don't get a connection right away
276+ while (true ) {
277+ int count = pendingBorrowCount .get ();
278+ if (count >= maxQueueSize ) {
279+ return Futures .immediateFailedFuture (new BusyPoolException (host .getSocketAddress (), maxQueueSize ));
333280 }
334-
335- if (isClosed ())
336- throw new ConnectionException (host .getSocketAddress (), "Pool is shutdown" );
337-
338- int minInFlight = Integer .MAX_VALUE ;
339- Connection leastBusy = null ;
340- for (Connection connection : connections ) {
341- int inFlight = connection .inFlight .get ();
342- if (inFlight < minInFlight ) {
343- minInFlight = inFlight ;
344- leastBusy = connection ;
345- }
281+ if (pendingBorrowCount .compareAndSet (count , count + 1 )) {
282+ break ;
346283 }
284+ }
347285
348- // If we race with shutdown, leastBusy could be null. In that case we just loop and we'll throw on the next
349- // iteration anyway
350- if (leastBusy != null ) {
351- while (true ) {
352- int inFlight = leastBusy .inFlight .get ();
353-
354- if (inFlight >= Math .min (leastBusy .maxAvailableStreams (), options ().getMaxRequestsPerConnection (hostDistance )))
355- break ;
356-
357- if (leastBusy .inFlight .compareAndSet (inFlight , inFlight + 1 ))
358- return leastBusy ;
359- }
360- }
286+ SettableFuture <Connection > future = SettableFuture .create ();
287+ pendingBorrows .add (future );
361288
362- remaining = timeout - Cluster .timeSince (start , unit );
363- } while (remaining > 0 );
289+ // If we raced with shutdown, make sure the future will be completed. This has no effect if it was properly
290+ // handled in closeAsync.
291+ if (phase .get () == Phase .CLOSING ) {
292+ future .setException (new ConnectionException (host .getSocketAddress (), "Pool is shutdown" ));
293+ }
364294
365- throw new TimeoutException ( "All connections are busy" ) ;
295+ return future ;
366296 }
367297
368- public void returnConnection (Connection connection ) {
298+ void returnConnection (Connection connection ) {
369299 connection .inFlight .decrementAndGet ();
370300 totalInFlight .decrementAndGet ();
371301
@@ -384,7 +314,37 @@ public void returnConnection(Connection connection) {
384314 if (connection .maxAvailableStreams () < minAllowedStreams ) {
385315 replaceConnection (connection );
386316 } else {
387- signalAvailableConnection ();
317+ dequeue (connection );
318+ }
319+ }
320+ }
321+
322+ // When a connection gets returned to the pool, check if there are pending borrows that can be completed with it.
323+ private void dequeue (Connection connection ) {
324+ while (!pendingBorrows .isEmpty ()) {
325+
326+ // We can only reuse the connection if it's under its maximum number of inFlight requests.
327+ // Do this atomically, as we could be competing with other borrowConnection or dequeue calls.
328+ while (true ) {
329+ int inFlight = connection .inFlight .get ();
330+ if (inFlight >= Math .min (connection .maxAvailableStreams (), options ().getMaxRequestsPerConnection (hostDistance ))) {
331+ // Connection is full again, stop dequeuing
332+ return ;
333+ }
334+ if (connection .inFlight .compareAndSet (inFlight , inFlight + 1 )) {
335+ // We acquired the right to reuse the connection for one request, proceed
336+ break ;
337+ }
338+ }
339+
340+ SettableFuture <Connection > pendingBorrow = pendingBorrows .poll ();
341+ if (pendingBorrow == null ) {
342+ // Another thread has emptied the queue since our last check, restore the count
343+ connection .inFlight .decrementAndGet ();
344+ } else {
345+ totalInFlight .incrementAndGet ();
346+ pendingBorrowCount .decrementAndGet ();
347+ pendingBorrow .set (connection );
388348 }
389349 }
390350 }
@@ -465,7 +425,7 @@ private boolean addConnectionIfUnderMaximum() {
465425 return false ;
466426 }
467427
468- signalAvailableConnection ( );
428+ dequeue ( newConnection );
469429 return true ;
470430 } catch (InterruptedException e ) {
471431 Thread .currentThread ().interrupt ();
@@ -600,20 +560,21 @@ private void close(final Connection connection) {
600560 connection .closeAsync ();
601561 }
602562
603- public final boolean isClosed () {
563+ final boolean isClosed () {
604564 return closeFuture .get () != null ;
605565 }
606566
607- public final CloseFuture closeAsync () {
567+ final CloseFuture closeAsync () {
608568
609569 CloseFuture future = closeFuture .get ();
610570 if (future != null )
611571 return future ;
612572
613573 phase .set (Phase .CLOSING );
614574
615- // Wake up all threads that wait
616- signalAllAvailableConnection ();
575+ for (SettableFuture <Connection > pendingBorrow : pendingBorrows ) {
576+ pendingBorrow .setException (new ConnectionException (host .getSocketAddress (), "Pool is shutdown" ));
577+ }
617578
618579 future = new CloseFuture .Forwarding (discardAvailableConnections ());
619580
@@ -622,7 +583,7 @@ public final CloseFuture closeAsync() {
622583 : closeFuture .get (); // We raced, it's ok, return the future that was actually set
623584 }
624585
625- public int opened () {
586+ int opened () {
626587 return open .get ();
627588 }
628589
@@ -657,7 +618,7 @@ public void run() {
657618
658619 // This creates connections if we have less than core connections (if we
659620 // have more than core, connection will just get trash when we can).
660- public void ensureCoreConnections () {
621+ void ensureCoreConnections () {
661622 if (isClosed ())
662623 return ;
663624
0 commit comments