Skip to content

Commit 2b9bd6c

Browse files
authored
core: delay retriable stream master listener close until all sub streams are closed (#9754)
This helps to prevent retryable stream from using calloptions.executor when it shouldn't, e.g. call is already notified closed. It is done by delaying notifying upper layer (through masterListener).
1 parent ce86090 commit 2b9bd6c

2 files changed

Lines changed: 52 additions & 18 deletions

File tree

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void uncaughtException(Thread t, Throwable e) {
108108
private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean();
109109
private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger();
110110
private final AtomicInteger inFlightSubStreams = new AtomicInteger();
111-
private Status savedCancellationReason;
111+
private SavedCloseMasterListenerReason savedCloseMasterListenerReason;
112112

113113
// Used for recording the share of buffer used for the current call out of the channel buffer.
114114
// This field would not be necessary if there is no channel buffer limit.
@@ -222,9 +222,10 @@ private void commitAndRun(Substream winningSubstream) {
222222
}
223223
}
224224

225-
@Nullable // returns null when cancelled
225+
// returns null means we should not create new sub streams, e.g. cancelled or
226+
// other close condition is met for retriableStream.
227+
@Nullable
226228
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
227-
// increment only when >= 0, i.e. not cancelled
228229
int inFlight;
229230
do {
230231
inFlight = inFlightSubStreams.get();
@@ -506,11 +507,8 @@ public final void cancel(final Status reason) {
506507
Runnable runnable = commit(noopSubstream);
507508

508509
if (runnable != null) {
509-
savedCancellationReason = reason;
510510
runnable.run();
511-
if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
512-
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
513-
}
511+
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
514512
return;
515513
}
516514

@@ -816,14 +814,30 @@ private void freezeHedging() {
816814
}
817815

818816
private void safeCloseMasterListener(Status status, RpcProgress progress, Metadata metadata) {
819-
listenerSerializeExecutor.execute(
820-
new Runnable() {
821-
@Override
822-
public void run() {
823-
isClosed = true;
824-
masterListener.closed(status, progress, metadata);
825-
}
826-
});
817+
savedCloseMasterListenerReason = new SavedCloseMasterListenerReason(status, progress,
818+
metadata);
819+
if (inFlightSubStreams.addAndGet(Integer.MIN_VALUE) == Integer.MIN_VALUE) {
820+
listenerSerializeExecutor.execute(
821+
new Runnable() {
822+
@Override
823+
public void run() {
824+
isClosed = true;
825+
masterListener.closed(status, progress, metadata);
826+
}
827+
});
828+
}
829+
}
830+
831+
private static final class SavedCloseMasterListenerReason {
832+
private final Status status;
833+
private final RpcProgress progress;
834+
private final Metadata metadata;
835+
836+
SavedCloseMasterListenerReason(Status status, RpcProgress progress, Metadata metadata) {
837+
this.status = status;
838+
this.progress = progress;
839+
this.metadata = metadata;
840+
}
827841
}
828842

829843
private interface BufferEntry {
@@ -864,8 +878,17 @@ public void closed(
864878
}
865879

866880
if (inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
867-
assert savedCancellationReason != null;
868-
safeCloseMasterListener(savedCancellationReason, RpcProgress.PROCESSED, new Metadata());
881+
assert savedCloseMasterListenerReason != null;
882+
listenerSerializeExecutor.execute(
883+
new Runnable() {
884+
@Override
885+
public void run() {
886+
isClosed = true;
887+
masterListener.closed(savedCloseMasterListenerReason.status,
888+
savedCloseMasterListenerReason.progress,
889+
savedCloseMasterListenerReason.metadata);
890+
}
891+
});
869892
return;
870893
}
871894

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2151,14 +2151,19 @@ public Void answer(InvocationOnMock in) {
21512151
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
21522152
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
21532153
inOrder.verify(retriableStreamRecorder).postCommit();
2154+
sublistenerCaptor1.getValue().closed(
2155+
Status.CANCELLED, PROCESSED, new Metadata());
2156+
sublistenerCaptor4.getValue().closed(
2157+
Status.CANCELLED, PROCESSED, new Metadata());
21542158
inOrder.verify(masterListener).closed(
21552159
any(Status.class), any(RpcProgress.class), any(Metadata.class));
21562160
inOrder.verifyNoMoreInteractions();
21572161

21582162
insight = new InsightBuilder();
21592163
hedgingStream.appendTimeoutInsight(insight);
21602164
assertThat(insight.toString()).isEqualTo(
2161-
"[closed=[UNAVAILABLE, INTERNAL], committed=[remote_addr=2.2.2.2:81]]");
2165+
"[closed=[UNAVAILABLE, INTERNAL, CANCELLED, CANCELLED], "
2166+
+ "committed=[remote_addr=2.2.2.2:81]]");
21622167
}
21632168

21642169
@Test
@@ -2425,6 +2430,7 @@ public void hedging_pushback_positive() {
24252430
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
24262431
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
24272432
inOrder.verify(retriableStreamRecorder).postCommit();
2433+
sublistenerCaptor3.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
24282434
inOrder.verify(masterListener).closed(fatal, PROCESSED, metadata);
24292435
inOrder.verifyNoMoreInteractions();
24302436
}
@@ -2605,6 +2611,8 @@ public void hedging_transparentRetry() {
26052611
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
26062612
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
26072613
verify(retriableStreamRecorder).postCommit();
2614+
sublistenerCaptor1.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
2615+
sublistenerCaptor4.getValue().closed(Status.CANCELLED, PROCESSED, metadata);
26082616
verify(masterListener).closed(status, REFUSED, metadata);
26092617
}
26102618

@@ -2645,6 +2653,9 @@ public void hedging_transparentRetryNotAllowed() {
26452653
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
26462654
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
26472655
verify(retriableStreamRecorder).postCommit();
2656+
sublistenerCaptor1.getValue()
2657+
.closed(Status.CANCELLED, REFUSED, new Metadata());
2658+
//master listener close should wait until all substreams are closed
26482659
verify(masterListener).closed(status, REFUSED, metadata);
26492660
}
26502661

0 commit comments

Comments
 (0)