Skip to content

Commit c3d2c1e

Browse files
committed
Add frame length integration test
1 parent e43a5e9 commit c3d2c1e

8 files changed

Lines changed: 217 additions & 31 deletions

File tree

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (C) 2017-2017 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.api.core.connection;
17+
18+
import com.datastax.oss.driver.api.core.DriverException;
19+
import java.net.SocketAddress;
20+
21+
/**
22+
* Thrown when an incoming or outgoing protocol frame exceeds the limit defined by {@code
23+
* protocol.max-frame-length} in the configuration.
24+
*
25+
* <p>This error is always rethrown directly to the client, without any retry attempt.
26+
*/
27+
public class FrameTooLongException extends DriverException {
28+
29+
private final SocketAddress address;
30+
31+
public FrameTooLongException(SocketAddress address, String message) {
32+
super(message, null, false);
33+
this.address = address;
34+
}
35+
36+
/** The address of the node that encountered the error. */
37+
public SocketAddress getAddress() {
38+
return address;
39+
}
40+
41+
@Override
42+
public DriverException copy() {
43+
return new FrameTooLongException(address, getMessage());
44+
}
45+
}

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ protected void initChannel(Channel channel) throws Exception {
209209
.map(f -> f.newSslHandler(channel, address))
210210
.map(h -> pipeline.addLast("ssl", h));
211211
pipeline
212-
.addLast("encoder", new FrameEncoder(context.frameCodec()))
212+
.addLast("encoder", new FrameEncoder(context.frameCodec(), maxFrameLength))
213213
.addLast("decoder", new FrameDecoder(context.frameCodec(), maxFrameLength))
214214
// Note: HeartbeatHandler is inserted here once init completes
215215
.addLast("inflight", inFlightHandler)

core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,16 @@ private void write(ChannelHandlerContext ctx, RequestMessage message, ChannelPro
140140

141141
inFlight.put(streamId, message.responseCallback);
142142
ChannelFuture writeFuture = ctx.write(frame, promise);
143-
if (message.responseCallback.holdStreamId()) {
144-
writeFuture.addListener(
145-
future -> {
146-
if (future.isSuccess()) {
143+
writeFuture.addListener(
144+
future -> {
145+
if (future.isSuccess()) {
146+
if (message.responseCallback.holdStreamId()) {
147147
message.responseCallback.onStreamIdAssigned(streamId);
148148
}
149-
});
150-
}
149+
} else {
150+
release(streamId, ctx);
151+
}
152+
});
151153
}
152154

153155
private void cancel(
@@ -240,30 +242,30 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
240242

241243
/** Called if an exception was thrown while processing an inbound event (i.e. a response). */
242244
@Override
243-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
244-
if (cause instanceof FrameDecodingException) {
245-
int streamId = ((FrameDecodingException) cause).streamId;
245+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable exception) throws Exception {
246+
if (exception instanceof FrameDecodingException) {
247+
int streamId = ((FrameDecodingException) exception).streamId;
246248
LOG.debug("[{}] Error while decoding response on stream id {}", logPrefix, streamId);
247249
if (streamId >= 0) {
248250
// We know which request matches the failing response, fail that one only
249251
ResponseCallback responseCallback = release(streamId, ctx);
250252
try {
251-
responseCallback.onFailure(cause.getCause());
253+
responseCallback.onFailure(exception.getCause());
252254
} catch (Throwable t) {
253255
LOG.warn("[{}] Unexpected error while invoking failure handler", logPrefix, t);
254256
}
255257
} else {
256258
LOG.warn(
257259
"[{}] Unexpected error while decoding incoming event frame",
258260
logPrefix,
259-
cause.getCause());
261+
exception.getCause());
260262
}
261263
} else {
262264
// Otherwise fail all pending requests
263265
abortAllInFlight(
264-
(cause instanceof HeartbeatException)
265-
? (HeartbeatException) cause
266-
: new ClosedConnectionException("Unexpected error on channel", cause));
266+
(exception instanceof HeartbeatException)
267+
? (HeartbeatException) exception
268+
: new ClosedConnectionException("Unexpected error on channel", exception));
267269
ctx.close();
268270
}
269271
}

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.datastax.oss.driver.api.core.CqlIdentifier;
2020
import com.datastax.oss.driver.api.core.DriverTimeoutException;
2121
import com.datastax.oss.driver.api.core.config.CoreDriverOption;
22+
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
2223
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
2324
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
2425
import com.datastax.oss.driver.api.core.cql.ResultSet;
@@ -56,6 +57,7 @@
5657
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
5758
import com.datastax.oss.protocol.internal.response.result.Void;
5859
import com.datastax.oss.protocol.internal.util.Bytes;
60+
import io.netty.handler.codec.EncoderException;
5961
import io.netty.util.concurrent.EventExecutor;
6062
import io.netty.util.concurrent.Future;
6163
import io.netty.util.concurrent.GenericFutureListener;
@@ -345,13 +347,19 @@ private NodeResponseCallback(
345347
@Override
346348
public void operationComplete(Future<java.lang.Void> future) throws Exception {
347349
if (!future.isSuccess()) {
348-
LOG.debug(
349-
"[{}] Failed to send request on {}, trying next node (cause: {})",
350-
logPrefix,
351-
channel,
352-
future.cause());
353-
recordError(node, future.cause());
354-
sendRequest(null, execution, retryCount); // try next node
350+
Throwable error = future.cause();
351+
if (error instanceof EncoderException
352+
&& error.getCause() instanceof FrameTooLongException) {
353+
setFinalError(error.getCause());
354+
} else {
355+
LOG.debug(
356+
"[{}] Failed to send request on {}, trying next node (cause: {})",
357+
logPrefix,
358+
channel,
359+
error);
360+
recordError(node, error);
361+
sendRequest(null, execution, retryCount); // try next node
362+
}
355363
} else {
356364
LOG.debug("[{}] Request sent on {}", logPrefix, channel);
357365
if (result.isDone()) {
@@ -504,10 +512,12 @@ public void onFailure(Throwable error) {
504512
return;
505513
}
506514
LOG.debug("[{}] Request failure, processing: {}", logPrefix, error.toString());
507-
RetryDecision decision =
508-
isIdempotent
509-
? retryPolicy.onRequestAborted(request, error, retryCount)
510-
: RetryDecision.RETHROW;
515+
RetryDecision decision;
516+
if (!isIdempotent || error instanceof FrameTooLongException) {
517+
decision = RetryDecision.RETHROW;
518+
} else {
519+
decision = retryPolicy.onRequestAborted(request, error, retryCount);
520+
}
511521
processRetryDecision(decision, error);
512522
}
513523

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515
*/
1616
package com.datastax.oss.driver.internal.core.protocol;
1717

18+
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
1819
import com.datastax.oss.protocol.internal.FrameCodec;
1920
import io.netty.buffer.ByteBuf;
2021
import io.netty.channel.ChannelHandlerContext;
2122
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
23+
import io.netty.handler.codec.TooLongFrameException;
2224
import org.slf4j.Logger;
2325
import org.slf4j.LoggerFactory;
2426

@@ -57,6 +59,10 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
5759
LOG.warn("Unexpected error while reading stream id", e1);
5860
streamId = -1;
5961
}
62+
if (e instanceof TooLongFrameException) {
63+
// Translate the Netty error to our own type
64+
e = new FrameTooLongException(ctx.channel().remoteAddress(), e.getMessage());
65+
}
6066
throw new FrameDecodingException(streamId, e);
6167
}
6268
}

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameEncoder.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package com.datastax.oss.driver.internal.core.protocol;
1717

18+
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
1819
import com.datastax.oss.protocol.internal.Frame;
1920
import com.datastax.oss.protocol.internal.FrameCodec;
21+
import io.netty.buffer.ByteBuf;
2022
import io.netty.channel.ChannelHandler;
2123
import io.netty.channel.ChannelHandlerContext;
2224
import io.netty.handler.codec.MessageToMessageEncoder;
@@ -25,14 +27,24 @@
2527
@ChannelHandler.Sharable
2628
public class FrameEncoder extends MessageToMessageEncoder<Frame> {
2729

28-
private final FrameCodec frameCodec;
30+
private final FrameCodec<ByteBuf> frameCodec;
31+
private final int maxFrameLength;
2932

30-
public FrameEncoder(FrameCodec frameCodec) {
33+
public FrameEncoder(FrameCodec<ByteBuf> frameCodec, int maxFrameLength) {
34+
super(Frame.class);
3135
this.frameCodec = frameCodec;
36+
this.maxFrameLength = maxFrameLength;
3237
}
3338

3439
@Override
3540
protected void encode(ChannelHandlerContext ctx, Frame frame, List<Object> out) throws Exception {
36-
out.add(frameCodec.encode(frame));
41+
ByteBuf buffer = frameCodec.encode(frame);
42+
int actualLength = buffer.readableBytes();
43+
if (actualLength > maxFrameLength) {
44+
throw new FrameTooLongException(
45+
ctx.channel().remoteAddress(),
46+
String.format("Outgoing frame length exceeds %d: %d", maxFrameLength, actualLength));
47+
}
48+
out.add(buffer);
3749
}
3850
}

core/src/test/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.oss.driver.internal.core.protocol;
1717

18+
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
1819
import com.datastax.oss.driver.internal.core.channel.ChannelHandlerTestBase;
1920
import com.datastax.oss.driver.internal.core.util.ByteBufs;
2021
import com.datastax.oss.protocol.internal.Compressor;
@@ -23,7 +24,6 @@
2324
import com.datastax.oss.protocol.internal.response.AuthSuccess;
2425
import io.netty.buffer.ByteBuf;
2526
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
26-
import io.netty.handler.codec.TooLongFrameException;
2727
import org.junit.Before;
2828
import org.junit.Test;
2929

@@ -92,7 +92,7 @@ public void should_fail_to_decode_if_payload_is_valid_but_too_long() {
9292
} catch (FrameDecodingException e) {
9393
// Then
9494
assertThat(e.streamId).isEqualTo(42);
95-
assertThat(e.getCause()).isInstanceOf(TooLongFrameException.class);
95+
assertThat(e.getCause()).isInstanceOf(FrameTooLongException.class);
9696
}
9797
}
9898

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (C) 2017-2017 DataStax Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.api.core.connection;
17+
18+
import com.datastax.oss.driver.api.core.AllNodesFailedException;
19+
import com.datastax.oss.driver.api.core.config.DriverOption;
20+
import com.datastax.oss.driver.api.core.context.DriverContext;
21+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
22+
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
23+
import com.datastax.oss.driver.api.core.retry.DefaultRetryPolicy;
24+
import com.datastax.oss.driver.api.core.retry.RetryDecision;
25+
import com.datastax.oss.driver.api.core.session.Request;
26+
import com.datastax.oss.driver.api.testinfra.cluster.ClusterRule;
27+
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
28+
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
29+
import java.nio.Buffer;
30+
import java.nio.ByteBuffer;
31+
import java.util.concurrent.CompletionStage;
32+
import java.util.concurrent.TimeUnit;
33+
import org.junit.Before;
34+
import org.junit.ClassRule;
35+
import org.junit.Test;
36+
37+
import static com.datastax.oss.simulacron.common.stubbing.PrimeDsl.noRows;
38+
import static com.datastax.oss.simulacron.common.stubbing.PrimeDsl.rows;
39+
import static com.datastax.oss.simulacron.common.stubbing.PrimeDsl.when;
40+
import static org.assertj.core.api.Assertions.assertThat;
41+
import static org.assertj.core.api.Assertions.fail;
42+
43+
public class FrameLengthIT {
44+
public static @ClassRule SimulacronRule simulacron =
45+
new SimulacronRule(ClusterSpec.builder().withNodes(1));
46+
47+
@ClassRule
48+
public static ClusterRule cluster =
49+
new ClusterRule(
50+
simulacron,
51+
"load-balancing-policy.class = com.datastax.oss.driver.api.testinfra.loadbalancing.SortingLoadBalancingPolicy",
52+
"request.retry-policy.class = \"com.datastax.oss.driver.api.core.connection.FrameLengthIT$AlwaysRetryAbortedPolicy\"",
53+
"protocol.max-frame-length = 100 kilobytes");
54+
55+
private static final SimpleStatement LARGE_QUERY =
56+
SimpleStatement.newInstance("select * from foo").setIdempotent(true);
57+
private static final SimpleStatement SLOW_QUERY =
58+
SimpleStatement.newInstance("select * from bar");
59+
60+
private static final Buffer ONE_HUNDRED_KB = ByteBuffer.allocate(100 * 1024).limit(100 * 1024);
61+
62+
@Before
63+
public void primeQueries() {
64+
simulacron
65+
.cluster()
66+
.prime(
67+
when(LARGE_QUERY.getQuery())
68+
.then(rows().row("result", ONE_HUNDRED_KB).columnTypes("result", "blob").build()));
69+
simulacron
70+
.cluster()
71+
.prime(when(SLOW_QUERY.getQuery()).then(noRows()).delay(60, TimeUnit.SECONDS));
72+
}
73+
74+
@Test(expected = FrameTooLongException.class)
75+
public void should_fail_if_request_exceeds_max_frame_length() {
76+
cluster
77+
.session()
78+
.execute(SimpleStatement.newInstance("insert into foo (k) values (?)", ONE_HUNDRED_KB));
79+
}
80+
81+
@Test
82+
public void should_fail_if_response_exceeds_max_frame_length() {
83+
CompletionStage<AsyncResultSet> slowResultFuture = cluster.session().executeAsync(SLOW_QUERY);
84+
try {
85+
cluster.session().execute(LARGE_QUERY);
86+
fail("Expected a " + FrameTooLongException.class.getSimpleName());
87+
} catch (FrameTooLongException e) {
88+
// expected
89+
}
90+
// Check that the error does not abort other requests on the same connection
91+
assertThat(slowResultFuture.toCompletableFuture()).isNotCompleted();
92+
}
93+
94+
/**
95+
* A retry policy that always retries aborted requests.
96+
*
97+
* <p>We use this to validate that {@link FrameTooLongException} is never passed to the policy (if
98+
* it were, then this policy would retry it, and the exception thrown to the client would be an
99+
* {@link AllNodesFailedException}).
100+
*/
101+
public static class AlwaysRetryAbortedPolicy extends DefaultRetryPolicy {
102+
public AlwaysRetryAbortedPolicy(DriverContext context, DriverOption configRoot) {
103+
super(context, configRoot);
104+
}
105+
106+
@Override
107+
public RetryDecision onRequestAborted(Request request, Throwable error, int retryCount) {
108+
return RetryDecision.RETRY_NEXT;
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)