Skip to content

Commit 671834f

Browse files
authored
core: retry part 3: use call executor and sceduled executor service
Use call executor and scheduled executor service to schedule and run `retry()`. Backoff amount computation from retry policy and testcases during backoff will be added in future PRs.
1 parent cf4a38e commit 671834f

File tree

3 files changed

+67
-19
lines changed

3 files changed

+67
-19
lines changed

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,8 @@ public <ReqT> RetriableStream<ReqT> newRetriableStream(
433433
final Metadata headers,
434434
final Context context) {
435435
return new RetriableStream<ReqT>(
436-
method, channelBufferUsed, perRpcBufferLimit, channelBufferLimit) {
436+
method, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
437+
getCallExecutor(callOptions), transportFactory.getScheduledExecutorService()) {
437438
@Override
438439
Status prestart() {
439440
return uncommittedRetriableStreamsRegistry.add(this);
@@ -647,17 +648,21 @@ public String authority() {
647648
return interceptorChannel.authority();
648649
}
649650

651+
private Executor getCallExecutor(CallOptions callOptions) {
652+
Executor executor = callOptions.getExecutor();
653+
if (executor == null) {
654+
executor = this.executor;
655+
}
656+
return executor;
657+
}
658+
650659
private class RealChannel extends Channel {
651660
@Override
652661
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
653662
CallOptions callOptions) {
654-
Executor executor = callOptions.getExecutor();
655-
if (executor == null) {
656-
executor = ManagedChannelImpl.this.executor;
657-
}
658663
return new ClientCallImpl<ReqT, RespT>(
659664
method,
660-
executor,
665+
getCallExecutor(callOptions),
661666
callOptions,
662667
transportProvider,
663668
terminated ? null : transportFactory.getScheduledExecutorService(),

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
import java.util.HashSet;
3535
import java.util.List;
3636
import java.util.Set;
37+
import java.util.concurrent.Executor;
38+
import java.util.concurrent.Future;
39+
import java.util.concurrent.ScheduledExecutorService;
40+
import java.util.concurrent.TimeUnit;
3741
import java.util.concurrent.atomic.AtomicLong;
3842
import javax.annotation.CheckReturnValue;
3943
import javax.annotation.Nullable;
@@ -45,6 +49,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
4549
Status.CANCELLED.withDescription("Stream thrown away because RetriableStream committed");
4650

4751
private final MethodDescriptor<ReqT, ?> method;
52+
private final Executor callExecutor;
53+
private final ScheduledExecutorService scheduledExecutorService;
4854

4955
/** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
5056
private final Object lock = new Object();
@@ -62,14 +68,18 @@ abstract class RetriableStream<ReqT> implements ClientStream {
6268
private long perRpcBufferUsed;
6369

6470
private ClientStreamListener masterListener;
71+
private Future<?> scheduledRetry;
6572

6673
RetriableStream(
6774
MethodDescriptor<ReqT, ?> method, ChannelBufferMeter channelBufferUsed,
68-
long perRpcBufferLimit, long channelBufferLimit) {
75+
long perRpcBufferLimit, long channelBufferLimit,
76+
Executor callExecutor, ScheduledExecutorService scheduledExecutorService) {
6977
this.method = method;
7078
this.channelBufferUsed = channelBufferUsed;
7179
this.perRpcBufferLimit = perRpcBufferLimit;
7280
this.channelBufferLimit = channelBufferLimit;
81+
this.callExecutor = callExecutor;
82+
this.scheduledExecutorService = scheduledExecutorService;
7383
}
7484

7585
@Nullable // null if already committed
@@ -240,6 +250,11 @@ public final void cancel(Status reason) {
240250
Runnable runnable = commit(noopSubstream);
241251

242252
if (runnable != null) {
253+
Future<?> savedScheduledRetry = scheduledRetry;
254+
if (savedScheduledRetry != null) {
255+
savedScheduledRetry.cancel(false);
256+
scheduledRetry = null;
257+
}
243258
masterListener.closed(reason, new Metadata());
244259
runnable.run();
245260
return;
@@ -492,8 +507,23 @@ public void closed(Status status, Metadata trailers) {
492507
if (state.winningSubstream == null && shouldRetry()) {
493508
// The check state.winningSubstream == null, checking if is not already committed, is racy,
494509
// but is still safe b/c the retry will also handle committed/cancellation
495-
// TODO(zdapeng): backoff and schedule; retry() should run in an executor
496-
retry();
510+
// TODO(zdapeng): compute backoff
511+
long backoffInMillis = 0L;
512+
scheduledRetry = scheduledExecutorService.schedule(
513+
new Runnable() {
514+
@Override
515+
public void run() {
516+
scheduledRetry = null;
517+
callExecutor.execute(new Runnable() {
518+
@Override
519+
public void run() {
520+
retry();
521+
}
522+
});
523+
}
524+
},
525+
backoffInMillis,
526+
TimeUnit.MILLISECONDS);
497527
} else if (!hasHedging()) {
498528
commitAndRun(substream);
499529
if (state.winningSubstream == substream) {

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.mockito.Mockito.verifyNoMoreInteractions;
3232
import static org.mockito.Mockito.when;
3333

34+
import com.google.common.util.concurrent.MoreExecutors;
3435
import io.grpc.CallOptions;
3536
import io.grpc.ClientStreamTracer;
3637
import io.grpc.Codec;
@@ -77,9 +78,11 @@ public class RetriableStreamTest {
7778
.setResponseMarshaller(new StringMarshaller())
7879
.build();
7980
private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
81+
private final FakeClock fakeClock = new FakeClock();
8082
private final RetriableStream<String> retriableStream =
8183
new RetriableStream<String>(
82-
method, channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT) {
84+
method, channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
85+
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService()) {
8386
@Override
8487
void postCommit() {
8588
retriableStreamRecorder.postCommit();
@@ -168,7 +171,8 @@ public void retry_everythingDrained() {
168171
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
169172

170173
inOrder.verify(retriableStreamRecorder).shouldRetry();
171-
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker
174+
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
175+
fakeClock.forwardNanos(0L);
172176
inOrder.verify(retriableStreamRecorder).newSubstream();
173177
inOrder.verify(mockStream2).setAuthority(AUTHORITY);
174178
inOrder.verify(mockStream2).setCompressor(COMPRESSOR);
@@ -207,7 +211,8 @@ public void retry_everythingDrained() {
207211
sublistenerCaptor2.getValue().closed(Status.UNAVAILABLE, new Metadata());
208212

209213
inOrder.verify(retriableStreamRecorder).shouldRetry();
210-
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker
214+
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
215+
fakeClock.forwardNanos(0L);
211216
inOrder.verify(retriableStreamRecorder).newSubstream();
212217
inOrder.verify(mockStream3).setAuthority(AUTHORITY);
213218
inOrder.verify(mockStream3).setCompressor(COMPRESSOR);
@@ -274,11 +279,12 @@ public void retry_headersRead_cancel() {
274279
verify(mockStream1).start(sublistenerCaptor1.capture());
275280

276281
// retry
277-
// TODO(zdapeng): forward backoff ticker
282+
// TODO(zdapeng): forward backoff ticker w/ right amount
278283
doReturn(true).when(retriableStreamRecorder).shouldRetry();
279284
ClientStream mockStream2 = mock(ClientStream.class);
280285
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream();
281286
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
287+
fakeClock.forwardNanos(0L);
282288

283289
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
284290
ArgumentCaptor.forClass(ClientStreamListener.class);
@@ -333,11 +339,12 @@ public void retry_headersRead_closed() {
333339
verify(mockStream1).start(sublistenerCaptor1.capture());
334340

335341
// retry
336-
// TODO(zdapeng): forward backoff ticker
342+
// TODO(zdapeng): forward backoff ticker w/ right amount
337343
doReturn(true).when(retriableStreamRecorder).shouldRetry();
338344
ClientStream mockStream2 = mock(ClientStream.class);
339345
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream();
340346
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
347+
fakeClock.forwardNanos(0L);
341348

342349
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
343350
ArgumentCaptor.forClass(ClientStreamListener.class);
@@ -402,11 +409,12 @@ public void retry_cancel_closed() {
402409
verify(mockStream1).start(sublistenerCaptor1.capture());
403410

404411
// retry
405-
// TODO(zdapeng): forward backoff ticker
412+
// TODO(zdapeng): forward backoff ticker w/ right amount
406413
doReturn(true).when(retriableStreamRecorder).shouldRetry();
407414
ClientStream mockStream2 = mock(ClientStream.class);
408415
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream();
409416
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
417+
fakeClock.forwardNanos(0L);
410418

411419
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
412420
ArgumentCaptor.forClass(ClientStreamListener.class);
@@ -468,11 +476,13 @@ public void retry_unretriableClosed_cancel() {
468476
verify(mockStream1).start(sublistenerCaptor1.capture());
469477

470478
// retry
471-
// TODO(zdapeng): forward backoff ticker
479+
// TODO(zdapeng): forward backoff ticker w/ right amount
480+
fakeClock.forwardNanos(0L);
472481
doReturn(true).when(retriableStreamRecorder).shouldRetry();
473482
ClientStream mockStream2 = mock(ClientStream.class);
474483
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream();
475484
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
485+
fakeClock.forwardNanos(0L);
476486

477487
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
478488
ArgumentCaptor.forClass(ClientStreamListener.class);
@@ -562,10 +572,11 @@ public void request(int numMessages) {
562572
inOrder.verify(mockStream1).writeMessage(any(InputStream.class)); // msg "substream1 request 1"
563573

564574
// retry
565-
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker
575+
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
566576
doReturn(true).when(retriableStreamRecorder).shouldRetry();
567577
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream();
568578
sublistenerCaptor1.getValue().closed(Status.UNAVAILABLE, new Metadata());
579+
fakeClock.forwardNanos(0L);
569580

570581
inOrder.verify(mockStream2).start(sublistenerCaptor2.get());
571582

@@ -670,11 +681,12 @@ public boolean isReady() {
670681
readiness.add(retriableStream.isReady()); // expected true
671682

672683
// retry
673-
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker
684+
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
674685
doReturn(true).when(retriableStreamRecorder).shouldRetry();
675686
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream();
676687
doReturn(false).when(mockStream1).isReady(); // mockStream1 closed, so isReady false
677688
sublistenerCaptor1.get().closed(Status.UNAVAILABLE, new Metadata());
689+
fakeClock.forwardNanos(0L);
678690

679691
verify(mockStream2).start(any(ClientStreamListener.class));
680692
readiness.add(retriableStream.isReady()); // expected true
@@ -730,8 +742,9 @@ public void start(ClientStreamListener listener) {
730742
ClientStreamListener listener1 = sublistenerCaptor1.getValue();
731743

732744
// retry
733-
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker
745+
// TODO(zdapeng): send more messages during backoff, then forward backoff ticker w/ right amount
734746
listener1.closed(Status.UNAVAILABLE, new Metadata());
747+
fakeClock.forwardNanos(0L);
735748

736749
retriableStream.request(1);
737750
verify(mockStream1, never()).request(1);

0 commit comments

Comments
 (0)