Skip to content

Commit b222356

Browse files
committed
JAVA-893: Make connection pool non-blocking
1 parent 98975cb commit b222356

12 files changed

Lines changed: 552 additions & 399 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
- [improvement] JAVA-1293: Make DecoderForStreamIdSize.MAX_FRAME_LENGTH configurable.
1717
- [improvement] JAVA-1053: Add a metric for authentication errors
1818
- [improvement] JAVA-1263: Eliminate unnecessary memory copies in FrameCompressor implementations.
19+
- [improvement] JAVA-893: Make connection pool non-blocking
1920

2021

2122
### 3.0.3

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.datastax.driver.core.exceptions.*;
2020
import com.datastax.driver.core.utils.MoreFutures;
2121
import com.google.common.annotations.VisibleForTesting;
22+
import com.google.common.base.Function;
23+
import com.google.common.base.Objects;
2224
import com.google.common.collect.Lists;
2325
import com.google.common.collect.MapMaker;
2426
import com.google.common.util.concurrent.*;
@@ -95,8 +97,9 @@ enum State {OPEN, TRASHED, RESURRECTING, GONE}
9597

9698
private final AtomicReference<Owner> ownerRef = new AtomicReference<Owner>();
9799

100+
private final ListenableFuture<Connection> thisFuture;
101+
98102
/**
99-
* /**
100103
* Create a new connection to a Cassandra node and associate it with the given pool.
101104
*
102105
* @param name the connection name
@@ -111,6 +114,7 @@ protected Connection(String name, InetSocketAddress address, Factory factory, Ow
111114
this.dispatcher = new Dispatcher();
112115
this.name = name;
113116
this.ownerRef.set(owner);
117+
this.thisFuture = Futures.immediateFuture(this);
114118
}
115119

116120
/**
@@ -483,16 +487,20 @@ void setKeyspace(String keyspace) throws ConnectionException {
483487
}
484488
}
485489

486-
ListenableFuture<Void> setKeyspaceAsync(final String keyspace) throws ConnectionException, BusyConnectionException {
490+
ListenableFuture<Connection> setKeyspaceAsync(final String keyspace) throws ConnectionException, BusyConnectionException {
491+
if (Objects.equal(this.keyspace, keyspace)) {
492+
return thisFuture;
493+
}
494+
487495
logger.trace("{} Setting keyspace {}", this, keyspace);
488496
// Note: we quote the keyspace below, because the name is the one coming from Cassandra, so it's in the right case already
489497
Future future = write(new Requests.Query("USE \"" + keyspace + '"'));
490-
return Futures.transform(future, new AsyncFunction<Message.Response, Void>() {
498+
return Futures.transform(future, new Function<Message.Response, Connection>() {
491499
@Override
492-
public ListenableFuture<Void> apply(Message.Response response) throws Exception {
500+
public Connection apply(Message.Response response) {
493501
if (response instanceof SetKeyspace) {
494502
Connection.this.keyspace = ((SetKeyspace) response).keyspace;
495-
return MoreFutures.VOID_SUCCESS;
503+
return Connection.this;
496504
} else if (response.type == ERROR) {
497505
Responses.Error error = (Responses.Error) response;
498506
throw defunct(error.asException(address));

driver-core/src/main/java/com/datastax/driver/core/HostConnectionPool.java

Lines changed: 75 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.datastax.driver.core;
1717

1818
import com.datastax.driver.core.exceptions.AuthenticationException;
19+
import com.datastax.driver.core.exceptions.BusyPoolException;
1920
import com.datastax.driver.core.exceptions.ConnectionException;
2021
import com.datastax.driver.core.exceptions.UnsupportedProtocolVersionException;
2122
import com.datastax.driver.core.utils.MoreFutures;
@@ -26,16 +27,13 @@
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
2829

29-
import java.util.ArrayList;
30-
import java.util.List;
31-
import java.util.ListIterator;
32-
import java.util.Set;
33-
import java.util.concurrent.*;
30+
import java.util.*;
31+
import java.util.concurrent.ConcurrentLinkedQueue;
32+
import java.util.concurrent.CopyOnWriteArrayList;
33+
import java.util.concurrent.CopyOnWriteArraySet;
34+
import java.util.concurrent.Executor;
3435
import java.util.concurrent.atomic.AtomicInteger;
3536
import java.util.concurrent.atomic.AtomicReference;
36-
import java.util.concurrent.locks.Condition;
37-
import java.util.concurrent.locks.Lock;
38-
import java.util.concurrent.locks.ReentrantLock;
3937

4038
import static com.datastax.driver.core.Connection.State.*;
4139

@@ -62,9 +60,8 @@ class HostConnectionPool implements Connection.Owner {
6260
@VisibleForTesting
6361
final Set<Connection> trash = new CopyOnWriteArraySet<Connection>();
6462

65-
private volatile int waiter = 0;
66-
private final Lock waitLock = new ReentrantLock(true);
67-
private final Condition hasAvailableConnection = waitLock.newCondition();
63+
private final Queue<SettableFuture<Connection>> pendingBorrows = new ConcurrentLinkedQueue<SettableFuture<Connection>>();
64+
private final AtomicInteger pendingBorrowCount = new AtomicInteger();
6865

6966
private final Runnable newConnectionTask;
7067

@@ -81,7 +78,7 @@ private enum Phase {INITIALIZING, READY, INIT_FAILED, CLOSING}
8178
// following threshold, we just replace the connection by a new one.
8279
private final int minAllowedStreams;
8380

84-
public HostConnectionPool(Host host, HostDistance hostDistance, SessionManager manager) {
81+
HostConnectionPool(Host host, HostDistance hostDistance, SessionManager manager) {
8582
assert hostDistance != HostDistance.IGNORED;
8683
this.host = host;
8784
this.hostDistance = hostDistance;
@@ -196,17 +193,13 @@ private PoolingOptions options() {
196193
return manager.configuration().getPoolingOptions();
197194
}
198195

199-
public Connection borrowConnection(long timeout, TimeUnit unit) throws ConnectionException, TimeoutException {
196+
ListenableFuture<Connection> borrowConnection(int maxQueueSize) {
200197
Phase phase = this.phase.get();
201198
if (phase != Phase.READY)
202-
// Note: throwing a ConnectionException is probably fine in practice as it will trigger the creation of a new host.
203-
// That being said, maybe having a specific exception could be cleaner.
204-
throw new ConnectionException(host.getSocketAddress(), "Pool is " + phase);
199+
return Futures.immediateFailedFuture(new ConnectionException(host.getSocketAddress(), "Pool is " + phase));
205200

206201
if (connections.isEmpty()) {
207-
if (!host.convictionPolicy.canReconnectNow())
208-
throw new TimeoutException("Connection pool is empty, currently trying to reestablish connections");
209-
else {
202+
if (host.convictionPolicy.canReconnectNow()) {
210203
int coreSize = options().getCoreConnectionsPerHost(hostDistance);
211204
if (coreSize == 0) {
212205
maybeSpawnNewConnection();
@@ -218,10 +211,7 @@ public Connection borrowConnection(long timeout, TimeUnit unit) throws Connectio
218211
manager.blockingExecutor().submit(newConnectionTask);
219212
}
220213
}
221-
Connection c = waitForConnection(timeout, unit);
222-
totalInFlight.incrementAndGet();
223-
c.setKeyspace(manager.poolsState.keyspace);
224-
return c;
214+
return enqueue(maxQueueSize);
225215
}
226216
}
227217

@@ -238,18 +228,17 @@ public Connection borrowConnection(long timeout, TimeUnit unit) throws Connectio
238228
if (leastBusy == null) {
239229
// We could have raced with a shutdown since the last check
240230
if (isClosed())
241-
throw new ConnectionException(host.getSocketAddress(), "Pool is shutdown");
231+
return Futures.immediateFailedFuture(new ConnectionException(host.getSocketAddress(), "Pool is shutdown"));
242232
// This might maybe happen if the number of core connections per host is 0 and a connection was trashed between
243233
// the previous check to connections and now. But in that case, the line above will have trigger the creation of
244234
// a new connection, so just wait that connection and move on
245-
leastBusy = waitForConnection(timeout, unit);
235+
return enqueue(maxQueueSize);
246236
} else {
247237
while (true) {
248238
int inFlight = leastBusy.inFlight.get();
249239

250240
if (inFlight >= Math.min(leastBusy.maxAvailableStreams(), options().getMaxRequestsPerConnection(hostDistance))) {
251-
leastBusy = waitForConnection(timeout, unit);
252-
break;
241+
return enqueue(maxQueueSize);
253242
}
254243

255244
if (leastBusy.inFlight.compareAndSet(inFlight, inFlight + 1))
@@ -276,96 +265,37 @@ public Connection borrowConnection(long timeout, TimeUnit unit) throws Connectio
276265
maybeSpawnNewConnection();
277266
}
278267

279-
leastBusy.setKeyspace(manager.poolsState.keyspace);
280-
return leastBusy;
281-
}
282-
283-
private void awaitAvailableConnection(long timeout, TimeUnit unit) throws InterruptedException {
284-
waitLock.lock();
285-
waiter++;
286-
try {
287-
hasAvailableConnection.await(timeout, unit);
288-
} finally {
289-
waiter--;
290-
waitLock.unlock();
291-
}
268+
return leastBusy.setKeyspaceAsync(manager.poolsState.keyspace);
292269
}
293270

294-
private void signalAvailableConnection() {
295-
// Quick check if it's worth signaling to avoid locking
296-
if (waiter == 0)
297-
return;
298-
299-
waitLock.lock();
300-
try {
301-
hasAvailableConnection.signal();
302-
} finally {
303-
waitLock.unlock();
271+
private ListenableFuture<Connection> enqueue(int maxQueueSize) {
272+
if (maxQueueSize == 0) {
273+
return Futures.immediateFailedFuture(new BusyPoolException(host.getSocketAddress(), 0));
304274
}
305-
}
306-
307-
private void signalAllAvailableConnection() {
308-
// Quick check if it's worth signaling to avoid locking
309-
if (waiter == 0)
310-
return;
311275

312-
waitLock.lock();
313-
try {
314-
hasAvailableConnection.signalAll();
315-
} finally {
316-
waitLock.unlock();
317-
}
318-
}
319-
320-
private Connection waitForConnection(long timeout, TimeUnit unit) throws ConnectionException, TimeoutException {
321-
if (timeout == 0)
322-
throw new TimeoutException("All connections are busy and pool timeout is 0");
323-
324-
long start = System.nanoTime();
325-
long remaining = timeout;
326-
do {
327-
try {
328-
awaitAvailableConnection(remaining, unit);
329-
} catch (InterruptedException e) {
330-
Thread.currentThread().interrupt();
331-
// If we're interrupted fine, check if there is a connection available but stop waiting otherwise
332-
timeout = 0; // this will make us stop the loop if we don't get a connection right away
276+
while (true) {
277+
int count = pendingBorrowCount.get();
278+
if (count >= maxQueueSize) {
279+
return Futures.immediateFailedFuture(new BusyPoolException(host.getSocketAddress(), maxQueueSize));
333280
}
334-
335-
if (isClosed())
336-
throw new ConnectionException(host.getSocketAddress(), "Pool is shutdown");
337-
338-
int minInFlight = Integer.MAX_VALUE;
339-
Connection leastBusy = null;
340-
for (Connection connection : connections) {
341-
int inFlight = connection.inFlight.get();
342-
if (inFlight < minInFlight) {
343-
minInFlight = inFlight;
344-
leastBusy = connection;
345-
}
281+
if (pendingBorrowCount.compareAndSet(count, count + 1)) {
282+
break;
346283
}
284+
}
347285

348-
// If we race with shutdown, leastBusy could be null. In that case we just loop and we'll throw on the next
349-
// iteration anyway
350-
if (leastBusy != null) {
351-
while (true) {
352-
int inFlight = leastBusy.inFlight.get();
353-
354-
if (inFlight >= Math.min(leastBusy.maxAvailableStreams(), options().getMaxRequestsPerConnection(hostDistance)))
355-
break;
356-
357-
if (leastBusy.inFlight.compareAndSet(inFlight, inFlight + 1))
358-
return leastBusy;
359-
}
360-
}
286+
SettableFuture<Connection> future = SettableFuture.create();
287+
pendingBorrows.add(future);
361288

362-
remaining = timeout - Cluster.timeSince(start, unit);
363-
} while (remaining > 0);
289+
// If we raced with shutdown, make sure the future will be completed. This has no effect if it was properly
290+
// handled in closeAsync.
291+
if (phase.get() == Phase.CLOSING) {
292+
future.setException(new ConnectionException(host.getSocketAddress(), "Pool is shutdown"));
293+
}
364294

365-
throw new TimeoutException("All connections are busy");
295+
return future;
366296
}
367297

368-
public void returnConnection(Connection connection) {
298+
void returnConnection(Connection connection) {
369299
connection.inFlight.decrementAndGet();
370300
totalInFlight.decrementAndGet();
371301

@@ -384,7 +314,37 @@ public void returnConnection(Connection connection) {
384314
if (connection.maxAvailableStreams() < minAllowedStreams) {
385315
replaceConnection(connection);
386316
} else {
387-
signalAvailableConnection();
317+
dequeue(connection);
318+
}
319+
}
320+
}
321+
322+
// When a connection gets returned to the pool, check if there are pending borrows that can be completed with it.
323+
private void dequeue(Connection connection) {
324+
while (!pendingBorrows.isEmpty()) {
325+
326+
// We can only reuse the connection if it's under its maximum number of inFlight requests.
327+
// Do this atomically, as we could be competing with other borrowConnection or dequeue calls.
328+
while (true) {
329+
int inFlight = connection.inFlight.get();
330+
if (inFlight >= Math.min(connection.maxAvailableStreams(), options().getMaxRequestsPerConnection(hostDistance))) {
331+
// Connection is full again, stop dequeuing
332+
return;
333+
}
334+
if (connection.inFlight.compareAndSet(inFlight, inFlight + 1)) {
335+
// We acquired the right to reuse the connection for one request, proceed
336+
break;
337+
}
338+
}
339+
340+
SettableFuture<Connection> pendingBorrow = pendingBorrows.poll();
341+
if (pendingBorrow == null) {
342+
// Another thread has emptied the queue since our last check, restore the count
343+
connection.inFlight.decrementAndGet();
344+
} else {
345+
totalInFlight.incrementAndGet();
346+
pendingBorrowCount.decrementAndGet();
347+
pendingBorrow.set(connection);
388348
}
389349
}
390350
}
@@ -465,7 +425,7 @@ private boolean addConnectionIfUnderMaximum() {
465425
return false;
466426
}
467427

468-
signalAvailableConnection();
428+
dequeue(newConnection);
469429
return true;
470430
} catch (InterruptedException e) {
471431
Thread.currentThread().interrupt();
@@ -600,20 +560,21 @@ private void close(final Connection connection) {
600560
connection.closeAsync();
601561
}
602562

603-
public final boolean isClosed() {
563+
final boolean isClosed() {
604564
return closeFuture.get() != null;
605565
}
606566

607-
public final CloseFuture closeAsync() {
567+
final CloseFuture closeAsync() {
608568

609569
CloseFuture future = closeFuture.get();
610570
if (future != null)
611571
return future;
612572

613573
phase.set(Phase.CLOSING);
614574

615-
// Wake up all threads that wait
616-
signalAllAvailableConnection();
575+
for (SettableFuture<Connection> pendingBorrow : pendingBorrows) {
576+
pendingBorrow.setException(new ConnectionException(host.getSocketAddress(), "Pool is shutdown"));
577+
}
617578

618579
future = new CloseFuture.Forwarding(discardAvailableConnections());
619580

@@ -622,7 +583,7 @@ public final CloseFuture closeAsync() {
622583
: closeFuture.get(); // We raced, it's ok, return the future that was actually set
623584
}
624585

625-
public int opened() {
586+
int opened() {
626587
return open.get();
627588
}
628589

@@ -657,7 +618,7 @@ public void run() {
657618

658619
// This creates connections if we have less than core connections (if we
659620
// have more than core, connection will just get trash when we can).
660-
public void ensureCoreConnections() {
621+
void ensureCoreConnections() {
661622
if (isClosed())
662623
return;
663624

0 commit comments

Comments
 (0)