Skip to content

Commit 0cfc799

Browse files
committed
JAVA-1547: Abort pending requests when connection dropped
1 parent bd5ca14 commit 0cfc799

3 files changed

Lines changed: 37 additions & 0 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.0.0-alpha1 (in progress)
66

7+
- [bug] JAVA-1547: Abort pending requests when connection dropped
78
- [new feature] JAVA-1497: Port timestamp generators from 3.x
89
- [improvement] JAVA-1539: Configure for deployment to Maven central
910
- [new feature] JAVA-1519: Close channel if number of orphan stream ids exceeds a configurable

core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,15 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws E
281281
}
282282
}
283283

284+
@Override
285+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
286+
// If the channel was closed normally (normal or forced shutdown), inFlight is already empty by
287+
// the time we get here. So if it's not, it means the channel closed unexpectedly (e.g. the
288+
// connection was dropped).
289+
abortAllInFlight(new ClosedConnectionException("Lost connection to remote peer"));
290+
super.channelInactive(ctx);
291+
}
292+
284293
private ResponseCallback release(int streamId, ChannelHandlerContext ctx) {
285294
LOG.debug("[{}] Releasing stream id {}", logPrefix, streamId);
286295
ResponseCallback responseCallback = inFlight.remove(streamId);

core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,33 @@ public void should_fail_all_pending_and_close_on_unexpected_inbound_exception()
370370
}
371371
}
372372

373+
@Test
374+
public void should_fail_all_pending_if_connection_lost() {
375+
// Given
376+
addToPipeline();
377+
Mockito.when(streamIds.acquire()).thenReturn(42, 43);
378+
MockResponseCallback responseCallback1 = new MockResponseCallback();
379+
MockResponseCallback responseCallback2 = new MockResponseCallback();
380+
channel
381+
.writeAndFlush(
382+
new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback1))
383+
.awaitUninterruptibly();
384+
channel
385+
.writeAndFlush(
386+
new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback2))
387+
.awaitUninterruptibly();
388+
389+
// When
390+
channel.pipeline().fireChannelInactive();
391+
392+
// Then
393+
for (MockResponseCallback callback : ImmutableList.of(responseCallback1, responseCallback2)) {
394+
assertThat(callback.getFailure())
395+
.isInstanceOf(ClosedConnectionException.class)
396+
.hasMessageContaining("Lost connection to remote peer");
397+
}
398+
}
399+
373400
@Test
374401
public void should_hold_stream_id_if_required() {
375402
// Given

0 commit comments

Comments
 (0)