1616
1717package io .grpc .internal ;
1818
19+ import static com .google .common .base .Preconditions .checkArgument ;
1920import static com .google .common .base .Preconditions .checkNotNull ;
2021import static com .google .common .base .Preconditions .checkState ;
2122
2223import com .google .common .annotations .VisibleForTesting ;
24+ import com .google .common .base .Objects ;
2325import io .grpc .Attributes ;
2426import io .grpc .CallOptions ;
2527import io .grpc .ClientStreamTracer ;
4345import javax .annotation .CheckReturnValue ;
4446import javax .annotation .Nullable ;
4547import javax .annotation .concurrent .GuardedBy ;
48+ import javax .annotation .concurrent .Immutable ;
4649
4750/** A logical {@link ClientStream} that is retriable. */
4851abstract class RetriableStream <ReqT > implements ClientStream {
@@ -58,6 +61,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
5861 private final ScheduledExecutorService scheduledExecutorService ;
5962 // Must not modify it.
6063 private final Metadata headers ;
64+ // TODO(zdapeng): add and use its business logic
65+ private final RetryPolicy retryPolicy ;
6166
6267 /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
6368 private final Object lock = new Object ();
@@ -80,14 +85,16 @@ abstract class RetriableStream<ReqT> implements ClientStream {
8085 RetriableStream (
8186 MethodDescriptor <ReqT , ?> method , Metadata headers ,
8287 ChannelBufferMeter channelBufferUsed , long perRpcBufferLimit , long channelBufferLimit ,
83- Executor callExecutor , ScheduledExecutorService scheduledExecutorService ) {
88+ Executor callExecutor , ScheduledExecutorService scheduledExecutorService ,
89+ RetryPolicy retryPolicy ) {
8490 this .method = method ;
8591 this .channelBufferUsed = channelBufferUsed ;
8692 this .perRpcBufferLimit = perRpcBufferLimit ;
8793 this .channelBufferLimit = channelBufferLimit ;
8894 this .callExecutor = callExecutor ;
8995 this .scheduledExecutorService = scheduledExecutorService ;
9096 this .headers = headers ;
97+ this .retryPolicy = checkNotNull (retryPolicy , "retryPolicy" );
9198 }
9299
93100 @ Nullable // null if already committed
@@ -777,4 +784,57 @@ public long addAndGet(long newBytesUsed) {
777784 return bufferUsed .addAndGet (newBytesUsed );
778785 }
779786 }
787+
788+ @ Immutable
789+ static final class RetryPolicy {
790+ private final int maxAttempts ;
791+ private final double initialBackoffInSeconds ;
792+ private final double maxBackoffInSeconds ;
793+ private final double backoffMultiplier ;
794+ private final Collection <Status .Code > retryableStatusCodes ;
795+
796+ RetryPolicy (
797+ int maxAttempts , double initialBackoffInSeconds , double maxBackoffInSeconds ,
798+ double backoffMultiplier , Collection <Status .Code > retryableStatusCodes ) {
799+ checkArgument (maxAttempts >= 1 , "maxAttempts" );
800+ this .maxAttempts = maxAttempts ;
801+ checkArgument (initialBackoffInSeconds >= 0D , "initialBackoffInSeconds" );
802+ this .initialBackoffInSeconds = initialBackoffInSeconds ;
803+ checkArgument (
804+ maxBackoffInSeconds >= initialBackoffInSeconds ,
805+ "maxBackoffInSeconds should be at least initialBackoffInSeconds" );
806+ this .maxBackoffInSeconds = maxBackoffInSeconds ;
807+ checkArgument (backoffMultiplier > 0D , "backoffMultiplier" );
808+ this .backoffMultiplier = backoffMultiplier ;
809+ this .retryableStatusCodes = Collections .unmodifiableSet (
810+ new HashSet <Status .Code >(checkNotNull (retryableStatusCodes , "retryableStatusCodes" )));
811+ }
812+
813+ /** No retry. */
814+ static final RetryPolicy DEFAULT =
815+ new RetryPolicy (1 , 0 , 0 , 1 , Collections .<Status .Code >emptyList ());
816+
817+ @ Override
818+ public boolean equals (Object o ) {
819+ if (this == o ) {
820+ return true ;
821+ }
822+ if (!(o instanceof RetryPolicy )) {
823+ return false ;
824+ }
825+ RetryPolicy that = (RetryPolicy ) o ;
826+ return maxAttempts == that .maxAttempts
827+ && Double .compare (backoffMultiplier , that .backoffMultiplier ) == 0
828+ && Double .compare (initialBackoffInSeconds , that .initialBackoffInSeconds ) == 0
829+ && Double .compare (maxBackoffInSeconds , that .maxBackoffInSeconds ) == 0
830+ && Objects .equal (retryableStatusCodes , that .retryableStatusCodes );
831+ }
832+
833+ @ Override
834+ public int hashCode () {
835+ return Objects .hashCode (
836+ maxAttempts , initialBackoffInSeconds , maxBackoffInSeconds , backoffMultiplier ,
837+ retryableStatusCodes );
838+ }
839+ }
780840}
0 commit comments