Skip to content

Commit 4e680fb

Browse files
committed
JAVA-1519: Close channel if number of "orphan" stream ids exceeds a configurable threshold
Motivation: Sometimes the driver might stop waiting for a response on a particular channel (for example if the request timed out, or was completed by another speculative execution). Before this change we just kept the response callback in our inflight map, which is a problem if we never get the response: it creates a memory leak, and if this keeps happening we'll eventually run out of stream ids on the channel. Modifications: Add a channel method to cancel a response callback, indicating that the caller is not interested in the response anymore. Modify existing clients to cancel their callbacks. Track the number of "orphan" stream ids (cancelled and have not yet received a response from the server). Initiate a graceful shutdown if this number exceeds a threshold. Improve the way the channel pool manages channel shutdowns: start the reconnection as soon as an orderly shutdown has *started*, not when it finishes. Result: Cancelled callbacks are not leaked anymore. When the number of orphan ids exceed the threshold, the channel is closed gracefully and the pool starts replacing it immediately.
1 parent 8eef599 commit 4e680fb

19 files changed

Lines changed: 529 additions & 106 deletions

changelog/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### 4.0.0-alpha1 (in progress)
66

7+
- [new feature] JAVA-1519: Close channel if number of orphan stream ids exceeds a configurable
8+
threshold
79
- [new feature] JAVA-1529: Make configuration reloadable
810
- [new feature] JAVA-1502: Reprepare statements on newly added/up nodes
911
- [new feature] JAVA-1530: Add ResultSet.wasApplied

core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public enum CoreDriverOption implements DriverOption {
3232
CONNECTION_MAX_REQUESTS("connection.max-requests-per-connection", true),
3333
CONNECTION_HEARTBEAT_INTERVAL("connection.heartbeat.interval", true),
3434
CONNECTION_HEARTBEAT_TIMEOUT("connection.heartbeat.timeout", true),
35+
CONNECTION_MAX_ORPHAN_REQUESTS("connection.max-orphan-requests", true),
3536

3637
REQUEST_TIMEOUT("request.timeout", true),
3738
REQUEST_CONSISTENCY("request.consistency", true),

core/src/main/java/com/datastax/oss/driver/internal/core/adminrequest/AdminRequestHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ private void onWriteComplete(Future<? super Void> future) {
117117
private void fireTimeout() {
118118
result.completeExceptionally(
119119
new DriverTimeoutException(String.format("%s timed out after %s", debugString, timeout)));
120+
if (!channel.closeFuture().isDone()) {
121+
channel.cancel(this);
122+
}
120123
}
121124

122125
@Override

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.netty.channel.ChannelInitializer;
3232
import io.netty.channel.ChannelOption;
3333
import io.netty.channel.ChannelPipeline;
34+
import io.netty.channel.ChannelPromise;
3435
import java.net.SocketAddress;
3536
import java.util.List;
3637
import java.util.Optional;
@@ -185,13 +186,19 @@ protected void initChannel(Channel channel) throws Exception {
185186
(int) defaultConfigProfile.getBytes(CoreDriverOption.CONNECTION_MAX_FRAME_LENGTH);
186187
int maxRequestsPerConnection =
187188
defaultConfigProfile.getInt(CoreDriverOption.CONNECTION_MAX_REQUESTS);
189+
int maxOrphanRequests =
190+
defaultConfigProfile.getInt(CoreDriverOption.CONNECTION_MAX_ORPHAN_REQUESTS);
191+
192+
ChannelPromise closeStartedFuture = channel.newPromise();
188193

189194
InFlightHandler inFlightHandler =
190195
new InFlightHandler(
191196
protocolVersion,
192197
new StreamIdGenerator(maxRequestsPerConnection),
198+
maxOrphanRequests,
193199
setKeyspaceTimeoutMillis,
194200
availableIdsHolder,
201+
closeStartedFuture,
195202
options.eventCallback,
196203
options.ownerLogPrefix);
197204
ProtocolInitHandler initHandler =

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelHandlerRequest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ public final void onFailure(Throwable error) {
8686

8787
private void onTimeout() {
8888
fail(new DriverTimeoutException(describe() + ": timed out after " + timeoutMillis + " ms"));
89+
if (!channel.closeFuture().isDone()) {
90+
// Cancel the response callback
91+
channel.writeAndFlush(this);
92+
}
8993
}
9094

9195
void failOnUnexpected(Message response) {

core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class DriverChannel {
4343
static final Object FORCEFUL_CLOSE_MESSAGE = new String("FORCEFUL_CLOSE_MESSAGE");
4444

4545
private final Channel channel;
46+
private final ChannelFuture closeStartedFuture;
4647
private final WriteCoalescer writeCoalescer;
4748
private final AvailableIdsHolder availableIdsHolder;
4849
private final ProtocolVersion protocolVersion;
@@ -55,6 +56,7 @@ public class DriverChannel {
5556
AvailableIdsHolder availableIdsHolder,
5657
ProtocolVersion protocolVersion) {
5758
this.channel = channel;
59+
this.closeStartedFuture = channel.pipeline().get(InFlightHandler.class).closeStartedFuture;
5860
this.writeCoalescer = writeCoalescer;
5961
this.availableIdsHolder = availableIdsHolder;
6062
this.protocolVersion = protocolVersion;
@@ -76,6 +78,22 @@ public Future<Void> write(
7678
return writeCoalescer.writeAndFlush(channel, message);
7779
}
7880

81+
/**
82+
* Cancels a callback, indicating that the client that wrote it is no longer interested in the
83+
* answer.
84+
*
85+
* <p>Note that this does not cancel the request server-side (but might in the future if Cassandra
86+
* supports it).
87+
*/
88+
public void cancel(ResponseCallback responseCallback) {
89+
if (closing.get()) {
90+
throw new IllegalStateException("Driver channel is closing");
91+
}
92+
// To avoid creating an extra message, we adopt the convention that writing the callback
93+
// directly means cancellation
94+
writeCoalescer.writeAndFlush(channel, responseCallback);
95+
}
96+
7997
/**
8098
* Releases a stream id if the client was holding onto it, and has now determined that it can be
8199
* safely reused.
@@ -156,7 +174,24 @@ public Future<Void> forceClose() {
156174
return channel.closeFuture();
157175
}
158176

159-
/** Does not close the channel, but returns a future that will complete when it does. */
177+
/**
178+
* Returns a future that will complete when a graceful close has started, but not yet completed.
179+
*
180+
* <p>In other words, the channel has stopped accepting new requests, but is still waiting for
181+
* pending requests to finish. Once the last response has been received, the channel will really
182+
* close and {@link #closeFuture()} will be completed.
183+
*
184+
* <p>If there were no pending requests when the graceful shutdown was initiated, or if {@link
185+
* #forceClose()} is called first, this future will never complete.
186+
*/
187+
public ChannelFuture closeStartedFuture() {
188+
return this.closeStartedFuture;
189+
}
190+
191+
/**
192+
* Does not close the channel, but returns a future that will complete when it is completely
193+
* closed.
194+
*/
160195
public ChannelFuture closeFuture() {
161196
return channel.closeFuture();
162197
}

core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@
2828
import com.datastax.oss.protocol.internal.Message;
2929
import com.datastax.oss.protocol.internal.request.Query;
3030
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
31-
import com.google.common.collect.Maps;
31+
import com.google.common.collect.BiMap;
32+
import com.google.common.collect.HashBiMap;
3233
import io.netty.channel.ChannelDuplexHandler;
3334
import io.netty.channel.ChannelFuture;
3435
import io.netty.channel.ChannelHandlerContext;
3536
import io.netty.channel.ChannelPromise;
3637
import io.netty.util.concurrent.Promise;
37-
import java.util.Map;
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040

@@ -44,28 +44,35 @@ public class InFlightHandler extends ChannelDuplexHandler {
4444

4545
private final ProtocolVersion protocolVersion;
4646
private final StreamIdGenerator streamIds;
47+
final ChannelPromise closeStartedFuture;
4748
private final String ownerLogPrefix;
48-
private final Map<Integer, ResponseCallback> inFlight;
49+
private final BiMap<Integer, ResponseCallback> inFlight;
4950
private final long setKeyspaceTimeoutMillis;
5051
private final AvailableIdsHolder availableIdsHolder;
5152
private final EventCallback eventCallback;
53+
private final int maxOrphanStreamIds;
5254
private boolean closingGracefully;
5355
private SetKeyspaceRequest setKeyspaceRequest;
5456
private String logPrefix;
57+
private int orphanStreamIds;
5558

5659
InFlightHandler(
5760
ProtocolVersion protocolVersion,
5861
StreamIdGenerator streamIds,
62+
int maxOrphanStreamIds,
5963
long setKeyspaceTimeoutMillis,
6064
AvailableIdsHolder availableIdsHolder,
65+
ChannelPromise closeStartedFuture,
6166
EventCallback eventCallback,
6267
String ownerLogPrefix) {
6368
this.protocolVersion = protocolVersion;
6469
this.streamIds = streamIds;
70+
this.maxOrphanStreamIds = maxOrphanStreamIds;
71+
this.closeStartedFuture = closeStartedFuture;
6572
this.ownerLogPrefix = ownerLogPrefix;
6673
this.logPrefix = ownerLogPrefix + "|connecting...";
6774
reportAvailableIds();
68-
this.inFlight = Maps.newHashMapWithExpectedSize(streamIds.getMaxAvailableIds());
75+
this.inFlight = HashBiMap.create(streamIds.getMaxAvailableIds());
6976
this.setKeyspaceTimeoutMillis = setKeyspaceTimeoutMillis;
7077
this.availableIdsHolder = availableIdsHolder;
7178
this.eventCallback = eventCallback;
@@ -81,29 +88,27 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
8188
@Override
8289
public void write(ChannelHandlerContext ctx, Object in, ChannelPromise promise) throws Exception {
8390
if (in == DriverChannel.GRACEFUL_CLOSE_MESSAGE) {
84-
if (inFlight.isEmpty()) {
85-
LOG.debug(
86-
"[{}] Received graceful close request and no pending queries, closing now", logPrefix);
87-
ctx.channel().close();
88-
} else {
89-
LOG.debug(
90-
"[{}] Received graceful close request, waiting for pending queries to complete",
91-
logPrefix);
92-
closingGracefully = true;
93-
}
94-
return;
91+
LOG.debug("[{}] Received graceful close request", logPrefix);
92+
startGracefulShutdown(ctx);
9593
} else if (in == DriverChannel.FORCEFUL_CLOSE_MESSAGE) {
9694
LOG.debug("[{}] Received forceful close request, aborting pending queries", logPrefix);
9795
abortAllInFlight(new ClosedConnectionException("Channel was force-closed"));
9896
ctx.channel().close();
99-
return;
10097
} else if (in instanceof HeartbeatException) {
10198
abortAllInFlight(
10299
new ClosedConnectionException("Heartbeat query failed", ((HeartbeatException) in)));
103100
ctx.close();
101+
} else if (in instanceof RequestMessage) {
102+
write(ctx, (RequestMessage) in, promise);
103+
} else if (in instanceof ResponseCallback) {
104+
cancel(ctx, (ResponseCallback) in, promise);
105+
} else {
106+
promise.setFailure(
107+
new IllegalArgumentException("Unsupported message type " + in.getClass().getName()));
104108
}
109+
}
105110

106-
assert in instanceof RequestMessage;
111+
private void write(ChannelHandlerContext ctx, RequestMessage message, ChannelPromise promise) {
107112
if (closingGracefully) {
108113
promise.setFailure(new IllegalStateException("Channel is closing"));
109114
return;
@@ -122,7 +127,6 @@ public void write(ChannelHandlerContext ctx, Object in, ChannelPromise promise)
122127

123128
reportAvailableIds();
124129

125-
RequestMessage message = (RequestMessage) in;
126130
LOG.debug("[{}] Writing {} on stream id {}", logPrefix, message.responseCallback, streamId);
127131
Frame frame =
128132
Frame.forRequest(
@@ -144,6 +148,48 @@ public void write(ChannelHandlerContext ctx, Object in, ChannelPromise promise)
144148
}
145149
}
146150

151+
private void cancel(
152+
ChannelHandlerContext ctx, ResponseCallback responseCallback, ChannelPromise promise) {
153+
Integer streamId = inFlight.inverse().remove(responseCallback);
154+
if (streamId == null) {
155+
LOG.debug(
156+
"[{}] Received cancellation request for unknown callback {}, skipping",
157+
logPrefix,
158+
responseCallback);
159+
} else {
160+
LOG.debug(
161+
"[{}] Cancelled callback {} for stream id {}", logPrefix, responseCallback, streamId);
162+
if (closingGracefully && inFlight.isEmpty()) {
163+
LOG.debug("[{}] Last pending query was cancelled, closing channel", logPrefix);
164+
ctx.channel().close();
165+
} else {
166+
// We can't release the stream id, because a response might still come back from the server.
167+
// Keep track of how many of those ids are held, because we want to replace the channel if
168+
// it becomes too high.
169+
orphanStreamIds += 1;
170+
if (orphanStreamIds > maxOrphanStreamIds) {
171+
LOG.debug(
172+
"[{}] Orphan stream ids exceeded the configured threshold ({}), closing gracefully",
173+
logPrefix,
174+
maxOrphanStreamIds);
175+
startGracefulShutdown(ctx);
176+
}
177+
}
178+
}
179+
promise.setSuccess();
180+
}
181+
182+
private void startGracefulShutdown(ChannelHandlerContext ctx) {
183+
if (inFlight.isEmpty()) {
184+
LOG.debug("[{}] No pending queries, completing graceful shutdown now", logPrefix);
185+
ctx.channel().close();
186+
} else {
187+
LOG.debug("[{}] There are pending queries, delaying graceful shutdown", logPrefix);
188+
closingGracefully = true;
189+
closeStartedFuture.setSuccess();
190+
}
191+
}
192+
147193
@Override
148194
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
149195
Frame responseFrame = (Frame) msg;
@@ -163,12 +209,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
163209
}
164210
} else {
165211
ResponseCallback responseCallback = inFlight.get(streamId);
166-
LOG.debug(
167-
"[{}] Got response on stream id {}, completing {}",
168-
logPrefix,
169-
streamId,
170-
responseCallback);
171-
if (responseCallback != null) {
212+
if (responseCallback == null) {
213+
LOG.debug("[{}] Got response on orphan stream id {}, releasing", logPrefix, streamId);
214+
release(streamId, ctx);
215+
orphanStreamIds -= 1;
216+
} else {
217+
LOG.debug(
218+
"[{}] Got response on stream id {}, completing {}",
219+
logPrefix,
220+
streamId,
221+
responseCallback);
172222
if (!responseCallback.holdStreamId()) {
173223
release(streamId, ctx);
174224
}

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ResponseCallback.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
* The outcome of a request sent to a Cassandra node.
2222
*
2323
* <p>This comes into play after the request has been successfully written to the channel.
24+
*
25+
* <p>Due to internal implementation constraints, different instances of this type must not be equal
26+
* to each other (they are stored in a {@code BiMap} in {@link InFlightHandler}); reference equality
27+
* should be appropriate in all cases.
2428
*/
2529
public interface ResponseCallback {
2630

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class CqlPrepareHandler
7373
private final RetryPolicy retryPolicy;
7474
private final Boolean prepareOnAllNodes;
7575
private final CqlPrepareProcessor processor;
76+
private volatile InitialPrepareCallback initialCallback;
7677

7778
// The errors on the nodes that were already tried (lazily initialized on the first error).
7879
// We don't use a map because nodes can appear multiple times.
@@ -124,7 +125,12 @@ public PreparedStatement syncResult() {
124125
private ScheduledFuture<?> scheduleTimeout(Duration timeout) {
125126
if (timeout.toNanos() > 0) {
126127
return scheduler.schedule(
127-
() -> setFinalError(new DriverTimeoutException("Query timed out after " + timeout)),
128+
() -> {
129+
setFinalError(new DriverTimeoutException("Query timed out after " + timeout));
130+
if (initialCallback != null) {
131+
initialCallback.cancel();
132+
}
133+
},
128134
timeout.toNanos(),
129135
TimeUnit.NANOSECONDS);
130136
} else {
@@ -154,7 +160,8 @@ private void sendRequest(Node node, int retryCount) {
154160
if (channel == null) {
155161
setFinalError(AllNodesFailedException.fromErrors(this.errors));
156162
} else {
157-
InitialPrepareCallback initialPrepareCallback = new InitialPrepareCallback(node, retryCount);
163+
InitialPrepareCallback initialPrepareCallback =
164+
new InitialPrepareCallback(node, channel, retryCount);
158165
channel
159166
.write(message, false, Frame.NO_PAYLOAD, initialPrepareCallback)
160167
.addListener(initialPrepareCallback);
@@ -254,12 +261,14 @@ private void setFinalError(Throwable error) {
254261
private class InitialPrepareCallback
255262
implements ResponseCallback, GenericFutureListener<Future<java.lang.Void>> {
256263
private final Node node;
264+
private final DriverChannel channel;
257265
// How many times we've invoked the retry policy and it has returned a "retry" decision (0 for
258266
// the first attempt of each execution).
259267
private final int retryCount;
260268

261-
private InitialPrepareCallback(Node node, int retryCount) {
269+
private InitialPrepareCallback(Node node, DriverChannel channel, int retryCount) {
262270
this.node = node;
271+
this.channel = channel;
263272
this.retryCount = retryCount;
264273
}
265274

@@ -275,7 +284,13 @@ public void operationComplete(Future<java.lang.Void> future) throws Exception {
275284
recordError(node, future.cause());
276285
sendRequest(null, retryCount); // try next host
277286
} else {
278-
LOG.debug("[{}] Request sent to {}", logPrefix, node);
287+
if (result.isDone()) {
288+
// Might happen if the timeout just fired
289+
cancel();
290+
} else {
291+
LOG.debug("[{}] Request sent to {}", logPrefix, node);
292+
initialCallback = this;
293+
}
279294
}
280295
}
281296

@@ -365,6 +380,16 @@ public void onFailure(Throwable error) {
365380
processRetryDecision(decision, error);
366381
}
367382

383+
public void cancel() {
384+
try {
385+
if (!channel.closeFuture().isDone()) {
386+
this.channel.cancel(this);
387+
}
388+
} catch (Throwable t) {
389+
LOG.warn("[{}] Error cancelling", logPrefix, t);
390+
}
391+
}
392+
368393
@Override
369394
public String toString() {
370395
return logPrefix;

0 commit comments

Comments
 (0)