Skip to content

Commit dc95465

Browse files
authored
core: retry part 5, add RetryPolicy data object
1 parent 722d6f0 commit dc95465

File tree

5 files changed

+345
-3
lines changed

5 files changed

+345
-3
lines changed

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static io.grpc.ConnectivityState.IDLE;
2323
import static io.grpc.ConnectivityState.SHUTDOWN;
2424
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25+
import static io.grpc.internal.RetriableStream.RetryPolicy.DEFAULT;
2526

2627
import com.google.common.annotations.VisibleForTesting;
2728
import com.google.common.base.Stopwatch;
@@ -53,6 +54,7 @@
5354
import io.grpc.internal.Channelz.ChannelStats;
5455
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
5556
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
57+
import io.grpc.internal.RetriableStream.RetryPolicy;
5658
import java.lang.ref.Reference;
5759
import java.lang.ref.ReferenceQueue;
5860
import java.lang.ref.SoftReference;
@@ -75,6 +77,7 @@
7577
import java.util.logging.LogRecord;
7678
import java.util.logging.Logger;
7779
import java.util.regex.Pattern;
80+
import javax.annotation.Nonnull;
7881
import javax.annotation.Nullable;
7982
import javax.annotation.concurrent.GuardedBy;
8083
import javax.annotation.concurrent.ThreadSafe;
@@ -205,6 +208,10 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
205208
private final long perRpcBufferLimit;
206209
private final long channelBufferLimit;
207210

211+
private RetryPolicies retryPolicies;
212+
// Temporary false flag that can skip the retry code path.
213+
private boolean retryEnabled;
214+
208215
// Called from channelExecutor
209216
private final ManagedClientTransport.Listener delayedTransportListener =
210217
new ManagedClientTransport.Listener() {
@@ -429,9 +436,11 @@ public <ReqT> RetriableStream<ReqT> newRetriableStream(
429436
final CallOptions callOptions,
430437
final Metadata headers,
431438
final Context context) {
439+
RetryPolicy retryPolicy = retryPolicies == null ? DEFAULT : retryPolicies.get(method);
432440
return new RetriableStream<ReqT>(
433441
method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
434-
getCallExecutor(callOptions), transportFactory.getScheduledExecutorService()) {
442+
getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(),
443+
retryPolicy) {
435444
@Override
436445
Status prestart() {
437446
return uncommittedRetriableStreamsRegistry.add(this);
@@ -1033,6 +1042,18 @@ public void run() {
10331042
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
10341043
return;
10351044
}
1045+
1046+
try {
1047+
if (retryEnabled) {
1048+
retryPolicies = getRetryPolicies(config);
1049+
}
1050+
} catch (RuntimeException re) {
1051+
logger.log(
1052+
Level.WARNING,
1053+
"[" + getLogId() + "] Unexpected exception from parsing service config",
1054+
re);
1055+
}
1056+
10361057
try {
10371058
balancer.handleResolvedAddressGroups(servers, config);
10381059
} catch (Throwable e) {
@@ -1067,6 +1088,16 @@ public void run() {
10671088
}
10681089
}
10691090

1091+
// TODO(zdapeng): implement it once the Gson dependency issue is resolved.
1092+
private static RetryPolicies getRetryPolicies(Attributes config) {
1093+
return new RetryPolicies() {
1094+
@Override
1095+
public RetryPolicy get(MethodDescriptor<?, ?> method) {
1096+
return RetryPolicy.DEFAULT;
1097+
}
1098+
};
1099+
}
1100+
10701101
private final class SubchannelImpl extends AbstractSubchannel {
10711102
// Set right after SubchannelImpl is created.
10721103
InternalSubchannel subchannel;
@@ -1253,4 +1284,10 @@ private static RuntimeException missingCallSite() {
12531284
return e;
12541285
}
12551286
}
1287+
1288+
@VisibleForTesting
1289+
interface RetryPolicies {
1290+
@Nonnull
1291+
RetryPolicy get(MethodDescriptor<?, ?> method);
1292+
}
12561293
}

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package io.grpc.internal;
1818

19+
import static com.google.common.base.Preconditions.checkArgument;
1920
import static com.google.common.base.Preconditions.checkNotNull;
2021
import static com.google.common.base.Preconditions.checkState;
2122

2223
import com.google.common.annotations.VisibleForTesting;
24+
import com.google.common.base.Objects;
2325
import io.grpc.Attributes;
2426
import io.grpc.CallOptions;
2527
import io.grpc.ClientStreamTracer;
@@ -43,6 +45,7 @@
4345
import javax.annotation.CheckReturnValue;
4446
import javax.annotation.Nullable;
4547
import javax.annotation.concurrent.GuardedBy;
48+
import javax.annotation.concurrent.Immutable;
4649

4750
/** A logical {@link ClientStream} that is retriable. */
4851
abstract class RetriableStream<ReqT> implements ClientStream {
@@ -58,6 +61,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
5861
private final ScheduledExecutorService scheduledExecutorService;
5962
// Must not modify it.
6063
private final Metadata headers;
64+
// TODO(zdapeng): add and use its business logic
65+
private final RetryPolicy retryPolicy;
6166

6267
/** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
6368
private final Object lock = new Object();
@@ -80,14 +85,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {
8085
RetriableStream(
8186
MethodDescriptor<ReqT, ?> method, Metadata headers,
8287
ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
83-
Executor callExecutor, ScheduledExecutorService scheduledExecutorService) {
88+
Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
89+
RetryPolicy retryPolicy) {
8490
this.method = method;
8591
this.channelBufferUsed = channelBufferUsed;
8692
this.perRpcBufferLimit = perRpcBufferLimit;
8793
this.channelBufferLimit = channelBufferLimit;
8894
this.callExecutor = callExecutor;
8995
this.scheduledExecutorService = scheduledExecutorService;
9096
this.headers = headers;
97+
this.retryPolicy = checkNotNull(retryPolicy, "retryPolicy");
9198
}
9299

93100
@Nullable // null if already committed
@@ -777,4 +784,57 @@ public long addAndGet(long newBytesUsed) {
777784
return bufferUsed.addAndGet(newBytesUsed);
778785
}
779786
}
787+
788+
@Immutable
789+
static final class RetryPolicy {
790+
private final int maxAttempts;
791+
private final double initialBackoffInSeconds;
792+
private final double maxBackoffInSeconds;
793+
private final double backoffMultiplier;
794+
private final Collection<Status.Code> retryableStatusCodes;
795+
796+
RetryPolicy(
797+
int maxAttempts, double initialBackoffInSeconds, double maxBackoffInSeconds,
798+
double backoffMultiplier, Collection<Status.Code> retryableStatusCodes) {
799+
checkArgument(maxAttempts >= 1, "maxAttempts");
800+
this.maxAttempts = maxAttempts;
801+
checkArgument(initialBackoffInSeconds >= 0D, "initialBackoffInSeconds");
802+
this.initialBackoffInSeconds = initialBackoffInSeconds;
803+
checkArgument(
804+
maxBackoffInSeconds >= initialBackoffInSeconds,
805+
"maxBackoffInSeconds should be at least initialBackoffInSeconds");
806+
this.maxBackoffInSeconds = maxBackoffInSeconds;
807+
checkArgument(backoffMultiplier > 0D, "backoffMultiplier");
808+
this.backoffMultiplier = backoffMultiplier;
809+
this.retryableStatusCodes = Collections.unmodifiableSet(
810+
new HashSet<Status.Code>(checkNotNull(retryableStatusCodes, "retryableStatusCodes")));
811+
}
812+
813+
/** No retry. */
814+
static final RetryPolicy DEFAULT =
815+
new RetryPolicy(1, 0, 0, 1, Collections.<Status.Code>emptyList());
816+
817+
@Override
818+
public boolean equals(Object o) {
819+
if (this == o) {
820+
return true;
821+
}
822+
if (!(o instanceof RetryPolicy)) {
823+
return false;
824+
}
825+
RetryPolicy that = (RetryPolicy) o;
826+
return maxAttempts == that.maxAttempts
827+
&& Double.compare(backoffMultiplier, that.backoffMultiplier) == 0
828+
&& Double.compare(initialBackoffInSeconds, that.initialBackoffInSeconds) == 0
829+
&& Double.compare(maxBackoffInSeconds, that.maxBackoffInSeconds) == 0
830+
&& Objects.equal(retryableStatusCodes, that.retryableStatusCodes);
831+
}
832+
833+
@Override
834+
public int hashCode() {
835+
return Objects.hashCode(
836+
maxAttempts, initialBackoffInSeconds, maxBackoffInSeconds, backoffMultiplier,
837+
retryableStatusCodes);
838+
}
839+
}
780840
}

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,14 @@
4545
import io.grpc.MethodDescriptor;
4646
import io.grpc.MethodDescriptor.MethodType;
4747
import io.grpc.Status;
48+
import io.grpc.Status.Code;
4849
import io.grpc.StringMarshaller;
4950
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
51+
import io.grpc.internal.RetriableStream.RetryPolicy;
5052
import io.grpc.internal.StreamListener.MessageProducer;
5153
import java.io.InputStream;
5254
import java.util.ArrayList;
55+
import java.util.Arrays;
5356
import java.util.List;
5457
import java.util.concurrent.atomic.AtomicReference;
5558
import org.junit.Test;
@@ -86,7 +89,8 @@ public class RetriableStreamTest {
8689
private final RetriableStream<String> retriableStream =
8790
new RetriableStream<String>(
8891
method, new Metadata(),channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
89-
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService()) {
92+
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(),
93+
new RetryPolicy(4, 100D, 300D, 2D, Arrays.asList(Code.UNAVAILABLE, Code.DATA_LOSS))) {
9094
@Override
9195
void postCommit() {
9296
retriableStreamRecorder.postCommit();

0 commit comments

Comments
 (0)