@@ -108,7 +108,7 @@ public void uncaughtException(Thread t, Throwable e) {
108108 private final AtomicBoolean noMoreTransparentRetry = new AtomicBoolean ();
109109 private final AtomicInteger localOnlyTransparentRetries = new AtomicInteger ();
110110 private final AtomicInteger inFlightSubStreams = new AtomicInteger ();
111- private Status savedCancellationReason ;
111+ private SavedCloseMasterListenerReason savedCloseMasterListenerReason ;
112112
113113 // Used for recording the share of buffer used for the current call out of the channel buffer.
114114 // This field would not be necessary if there is no channel buffer limit.
@@ -222,9 +222,10 @@ private void commitAndRun(Substream winningSubstream) {
222222 }
223223 }
224224
225- @ Nullable // returns null when cancelled
225+ // returns null means we should not create new sub streams, e.g. cancelled or
226+ // other close condition is met for retriableStream.
227+ @ Nullable
226228 private Substream createSubstream (int previousAttemptCount , boolean isTransparentRetry ) {
227- // increment only when >= 0, i.e. not cancelled
228229 int inFlight ;
229230 do {
230231 inFlight = inFlightSubStreams .get ();
@@ -506,11 +507,8 @@ public final void cancel(final Status reason) {
506507 Runnable runnable = commit (noopSubstream );
507508
508509 if (runnable != null ) {
509- savedCancellationReason = reason ;
510510 runnable .run ();
511- if (inFlightSubStreams .addAndGet (Integer .MIN_VALUE ) == Integer .MIN_VALUE ) {
512- safeCloseMasterListener (reason , RpcProgress .PROCESSED , new Metadata ());
513- }
511+ safeCloseMasterListener (reason , RpcProgress .PROCESSED , new Metadata ());
514512 return ;
515513 }
516514
@@ -816,14 +814,30 @@ private void freezeHedging() {
816814 }
817815
818816 private void safeCloseMasterListener (Status status , RpcProgress progress , Metadata metadata ) {
819- listenerSerializeExecutor .execute (
820- new Runnable () {
821- @ Override
822- public void run () {
823- isClosed = true ;
824- masterListener .closed (status , progress , metadata );
825- }
826- });
817+ savedCloseMasterListenerReason = new SavedCloseMasterListenerReason (status , progress ,
818+ metadata );
819+ if (inFlightSubStreams .addAndGet (Integer .MIN_VALUE ) == Integer .MIN_VALUE ) {
820+ listenerSerializeExecutor .execute (
821+ new Runnable () {
822+ @ Override
823+ public void run () {
824+ isClosed = true ;
825+ masterListener .closed (status , progress , metadata );
826+ }
827+ });
828+ }
829+ }
830+
831+ private static final class SavedCloseMasterListenerReason {
832+ private final Status status ;
833+ private final RpcProgress progress ;
834+ private final Metadata metadata ;
835+
836+ SavedCloseMasterListenerReason (Status status , RpcProgress progress , Metadata metadata ) {
837+ this .status = status ;
838+ this .progress = progress ;
839+ this .metadata = metadata ;
840+ }
827841 }
828842
829843 private interface BufferEntry {
@@ -864,8 +878,17 @@ public void closed(
864878 }
865879
866880 if (inFlightSubStreams .decrementAndGet () == Integer .MIN_VALUE ) {
867- assert savedCancellationReason != null ;
868- safeCloseMasterListener (savedCancellationReason , RpcProgress .PROCESSED , new Metadata ());
881+ assert savedCloseMasterListenerReason != null ;
882+ listenerSerializeExecutor .execute (
883+ new Runnable () {
884+ @ Override
885+ public void run () {
886+ isClosed = true ;
887+ masterListener .closed (savedCloseMasterListenerReason .status ,
888+ savedCloseMasterListenerReason .progress ,
889+ savedCloseMasterListenerReason .metadata );
890+ }
891+ });
869892 return ;
870893 }
871894
0 commit comments