Skip to content

Commit d3c5b00

Browse files
committed
Add CallOptions.
- Pass CallOptions to Channel.newCall() and ClientInterceptor.interceptCall(). - Remove timeout from AbstractStub.StubConfigBuilder and add deadline, which is stored in a CallOptions inside the stub. - Deadline is in nanoseconds in the clock defined by System.nanoTime(). It is converted to timeout before transmitting on the wire. Fail the call with DEADLINE_EXCEEDED if it's already expired.
1 parent 12d5797 commit d3c5b00

File tree

28 files changed

+719
-243
lines changed

28 files changed

+719
-243
lines changed

auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.auth.Credentials;
3535
import com.google.common.base.Preconditions;
3636

37+
import io.grpc.CallOptions;
3738
import io.grpc.Channel;
3839
import io.grpc.ClientCall;
3940
import io.grpc.ClientInterceptor;
@@ -68,10 +69,10 @@ public ClientAuthInterceptor(Credentials credentials, Executor executor) {
6869

6970
@Override
7071
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
71-
Channel next) {
72+
CallOptions callOptions, Channel next) {
7273
// TODO(ejona86): If the call fails for Auth reasons, this does not properly propagate info that
7374
// would be in WWW-Authenticate, because it does not yet have access to the header.
74-
return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method)) {
75+
return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
7576
@Override
7677
protected void checkedStart(Listener<RespT> responseListener, Metadata.Headers headers)
7778
throws Exception {

auth/src/test/java/io/grpc/auth/ClientAuthInterceptorTests.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131

3232
package io.grpc.auth;
3333

34+
import static org.mockito.Mockito.any;
3435
import static org.mockito.Mockito.isA;
3536
import static org.mockito.Mockito.never;
37+
import static org.mockito.Mockito.same;
3638
import static org.mockito.Mockito.verify;
3739
import static org.mockito.Mockito.when;
3840

@@ -44,6 +46,7 @@
4446
import com.google.common.collect.ListMultimap;
4547
import com.google.common.collect.Multimaps;
4648

49+
import io.grpc.CallOptions;
4750
import io.grpc.Channel;
4851
import io.grpc.ClientCall;
4952
import io.grpc.Metadata;
@@ -96,7 +99,7 @@ public class ClientAuthInterceptorTests {
9699
@Before
97100
public void startUp() throws IOException {
98101
MockitoAnnotations.initMocks(this);
99-
when(channel.newCall(descriptor)).thenReturn(call);
102+
when(channel.newCall(same(descriptor), any(CallOptions.class))).thenReturn(call);
100103
interceptor = new ClientAuthInterceptor(credentials,
101104
Executors.newSingleThreadExecutor());
102105
}
@@ -109,7 +112,8 @@ public void testCopyCredentialToHeaders() throws IOException {
109112
values.put("Extra-Authorization", "token3");
110113
values.put("Extra-Authorization", "token4");
111114
when(credentials.getRequestMetadata()).thenReturn(Multimaps.asMap(values));
112-
ClientCall<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
115+
ClientCall<String, Integer> interceptedCall =
116+
interceptor.interceptCall(descriptor, CallOptions.DEFAULT, channel);
113117
Metadata.Headers headers = new Metadata.Headers();
114118
interceptedCall.start(listener, headers);
115119
verify(call).start(listener, headers);
@@ -125,7 +129,8 @@ public void testCopyCredentialToHeaders() throws IOException {
125129
@Test
126130
public void testCredentialsThrows() throws IOException {
127131
when(credentials.getRequestMetadata()).thenThrow(new IOException("Broken"));
128-
ClientCall<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
132+
ClientCall<String, Integer> interceptedCall =
133+
interceptor.interceptCall(descriptor, CallOptions.DEFAULT, channel);
129134
Metadata.Headers headers = new Metadata.Headers();
130135
interceptedCall.start(listener, headers);
131136
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
@@ -146,7 +151,8 @@ public AccessToken refreshAccessToken() throws IOException {
146151
}
147152
};
148153
interceptor = new ClientAuthInterceptor(oAuth2Credentials, Executors.newSingleThreadExecutor());
149-
ClientCall<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
154+
ClientCall<String, Integer> interceptedCall =
155+
interceptor.interceptCall(descriptor, CallOptions.DEFAULT, channel);
150156
Metadata.Headers headers = new Metadata.Headers();
151157
interceptedCall.start(listener, headers);
152158
verify(call).start(listener, headers);

benchmarks/src/generated/main/io/grpc/testing/TestServiceGrpc.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -113,24 +113,31 @@ private TestServiceStub(io.grpc.Channel channel,
113113
super(channel, config);
114114
}
115115

116+
private TestServiceStub(io.grpc.Channel channel,
117+
TestServiceServiceDescriptor config,
118+
io.grpc.CallOptions callOptions) {
119+
super(channel, config, callOptions);
120+
}
121+
116122
@java.lang.Override
117123
protected TestServiceStub build(io.grpc.Channel channel,
118-
TestServiceServiceDescriptor config) {
119-
return new TestServiceStub(channel, config);
124+
TestServiceServiceDescriptor config,
125+
io.grpc.CallOptions callOptions) {
126+
return new TestServiceStub(channel, config, callOptions);
120127
}
121128

122129
@java.lang.Override
123130
public void unaryCall(io.grpc.testing.SimpleRequest request,
124131
io.grpc.stub.StreamObserver<io.grpc.testing.SimpleResponse> responseObserver) {
125132
asyncUnaryCall(
126-
channel.newCall(config.unaryCall), request, responseObserver);
133+
channel.newCall(config.unaryCall, callOptions), request, responseObserver);
127134
}
128135

129136
@java.lang.Override
130137
public io.grpc.stub.StreamObserver<io.grpc.testing.SimpleRequest> streamingCall(
131138
io.grpc.stub.StreamObserver<io.grpc.testing.SimpleResponse> responseObserver) {
132139
return duplexStreamingCall(
133-
channel.newCall(config.streamingCall), responseObserver);
140+
channel.newCall(config.streamingCall, callOptions), responseObserver);
134141
}
135142
}
136143

@@ -142,16 +149,23 @@ private TestServiceBlockingStub(io.grpc.Channel channel,
142149
super(channel, config);
143150
}
144151

152+
private TestServiceBlockingStub(io.grpc.Channel channel,
153+
TestServiceServiceDescriptor config,
154+
io.grpc.CallOptions callOptions) {
155+
super(channel, config, callOptions);
156+
}
157+
145158
@java.lang.Override
146159
protected TestServiceBlockingStub build(io.grpc.Channel channel,
147-
TestServiceServiceDescriptor config) {
148-
return new TestServiceBlockingStub(channel, config);
160+
TestServiceServiceDescriptor config,
161+
io.grpc.CallOptions callOptions) {
162+
return new TestServiceBlockingStub(channel, config, callOptions);
149163
}
150164

151165
@java.lang.Override
152166
public io.grpc.testing.SimpleResponse unaryCall(io.grpc.testing.SimpleRequest request) {
153167
return blockingUnaryCall(
154-
channel.newCall(config.unaryCall), request);
168+
channel.newCall(config.unaryCall, callOptions), request);
155169
}
156170
}
157171

@@ -163,17 +177,24 @@ private TestServiceFutureStub(io.grpc.Channel channel,
163177
super(channel, config);
164178
}
165179

180+
private TestServiceFutureStub(io.grpc.Channel channel,
181+
TestServiceServiceDescriptor config,
182+
io.grpc.CallOptions callOptions) {
183+
super(channel, config, callOptions);
184+
}
185+
166186
@java.lang.Override
167187
protected TestServiceFutureStub build(io.grpc.Channel channel,
168-
TestServiceServiceDescriptor config) {
169-
return new TestServiceFutureStub(channel, config);
188+
TestServiceServiceDescriptor config,
189+
io.grpc.CallOptions callOptions) {
190+
return new TestServiceFutureStub(channel, config, callOptions);
170191
}
171192

172193
@java.lang.Override
173194
public com.google.common.util.concurrent.ListenableFuture<io.grpc.testing.SimpleResponse> unaryCall(
174195
io.grpc.testing.SimpleRequest request) {
175196
return unaryFutureCall(
176-
channel.newCall(config.unaryCall), request);
197+
channel.newCall(config.unaryCall, callOptions), request);
177198
}
178199
}
179200

benchmarks/src/generated/main/io/grpc/testing/WorkerGrpc.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -108,24 +108,31 @@ private WorkerStub(io.grpc.Channel channel,
108108
super(channel, config);
109109
}
110110

111+
private WorkerStub(io.grpc.Channel channel,
112+
WorkerServiceDescriptor config,
113+
io.grpc.CallOptions callOptions) {
114+
super(channel, config, callOptions);
115+
}
116+
111117
@java.lang.Override
112118
protected WorkerStub build(io.grpc.Channel channel,
113-
WorkerServiceDescriptor config) {
114-
return new WorkerStub(channel, config);
119+
WorkerServiceDescriptor config,
120+
io.grpc.CallOptions callOptions) {
121+
return new WorkerStub(channel, config, callOptions);
115122
}
116123

117124
@java.lang.Override
118125
public io.grpc.stub.StreamObserver<io.grpc.testing.ClientArgs> runTest(
119126
io.grpc.stub.StreamObserver<io.grpc.testing.ClientStatus> responseObserver) {
120127
return duplexStreamingCall(
121-
channel.newCall(config.runTest), responseObserver);
128+
channel.newCall(config.runTest, callOptions), responseObserver);
122129
}
123130

124131
@java.lang.Override
125132
public io.grpc.stub.StreamObserver<io.grpc.testing.ServerArgs> runServer(
126133
io.grpc.stub.StreamObserver<io.grpc.testing.ServerStatus> responseObserver) {
127134
return duplexStreamingCall(
128-
channel.newCall(config.runServer), responseObserver);
135+
channel.newCall(config.runServer, callOptions), responseObserver);
129136
}
130137
}
131138

@@ -137,10 +144,17 @@ private WorkerBlockingStub(io.grpc.Channel channel,
137144
super(channel, config);
138145
}
139146

147+
private WorkerBlockingStub(io.grpc.Channel channel,
148+
WorkerServiceDescriptor config,
149+
io.grpc.CallOptions callOptions) {
150+
super(channel, config, callOptions);
151+
}
152+
140153
@java.lang.Override
141154
protected WorkerBlockingStub build(io.grpc.Channel channel,
142-
WorkerServiceDescriptor config) {
143-
return new WorkerBlockingStub(channel, config);
155+
WorkerServiceDescriptor config,
156+
io.grpc.CallOptions callOptions) {
157+
return new WorkerBlockingStub(channel, config, callOptions);
144158
}
145159
}
146160

@@ -152,10 +166,17 @@ private WorkerFutureStub(io.grpc.Channel channel,
152166
super(channel, config);
153167
}
154168

169+
private WorkerFutureStub(io.grpc.Channel channel,
170+
WorkerServiceDescriptor config,
171+
io.grpc.CallOptions callOptions) {
172+
super(channel, config, callOptions);
173+
}
174+
155175
@java.lang.Override
156176
protected WorkerFutureStub build(io.grpc.Channel channel,
157-
WorkerServiceDescriptor config) {
158-
return new WorkerFutureStub(channel, config);
177+
WorkerServiceDescriptor config,
178+
io.grpc.CallOptions callOptions) {
179+
return new WorkerFutureStub(channel, config, callOptions);
159180
}
160181
}
161182

benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.util.concurrent.MoreExecutors;
44

5+
import io.grpc.CallOptions;
56
import io.grpc.ChannelImpl;
67
import io.grpc.ClientCall;
78
import io.grpc.DeferredInputStream;
@@ -97,6 +98,8 @@ public enum ChannelType {
9798
NIO, LOCAL;
9899
}
99100

101+
private static final CallOptions CALL_OPTIONS = CallOptions.DEFAULT;
102+
100103
private static final InetAddress BENCHMARK_ADDR;
101104

102105
/**
@@ -222,20 +225,14 @@ public void setup(ExecutorType clientExecutor,
222225
// Simple method that sends and receives NettyByteBuf
223226
unaryMethod = MethodDescriptor.create(MethodType.UNARY,
224227
"benchmark/unary",
225-
5,
226-
TimeUnit.SECONDS,
227228
new ByteBufOutputMarshaller(),
228229
new ByteBufOutputMarshaller());
229230
pingPongMethod = MethodDescriptor.create(MethodType.DUPLEX_STREAMING,
230231
"benchmark/pingPong",
231-
5,
232-
TimeUnit.SECONDS,
233232
new ByteBufOutputMarshaller(),
234233
new ByteBufOutputMarshaller());
235234
flowControlledStreaming = MethodDescriptor.create(MethodType.DUPLEX_STREAMING,
236235
"benchmark/flowControlledStreaming",
237-
5,
238-
TimeUnit.SECONDS,
239236
new ByteBufOutputMarshaller(),
240237
new ByteBufOutputMarshaller());
241238

@@ -394,7 +391,8 @@ public void onError(Throwable t) {
394391
public void onCompleted() {
395392
if (!done.get()) {
396393
ByteBuf slice = request.slice();
397-
ClientCalls.asyncUnaryCall(channel.newCall(unaryMethod), slice, this);
394+
ClientCalls.asyncUnaryCall(
395+
channel.newCall(unaryMethod, CALL_OPTIONS), slice, this);
398396
}
399397
}
400398
};
@@ -414,7 +412,8 @@ protected void startStreamingCalls(int callsPerChannel,
414412
final long counterDelta) {
415413
for (final ChannelImpl channel : channels) {
416414
for (int i = 0; i < callsPerChannel; i++) {
417-
final ClientCall<ByteBuf, ByteBuf> streamingCall = channel.newCall(pingPongMethod);
415+
final ClientCall<ByteBuf, ByteBuf> streamingCall =
416+
channel.newCall(pingPongMethod, CALL_OPTIONS);
418417
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
419418
new AtomicReference<StreamObserver<ByteBuf>>();
420419
StreamObserver<ByteBuf> requestObserver = ClientCalls.duplexStreamingCall(streamingCall,
@@ -457,7 +456,8 @@ protected void startFlowControlledStreamingCalls(int callsPerChannel,
457456
final long counterDelta) {
458457
for (final ChannelImpl channel : channels) {
459458
for (int i = 0; i < callsPerChannel; i++) {
460-
final ClientCall<ByteBuf, ByteBuf> streamingCall = channel.newCall(flowControlledStreaming);
459+
final ClientCall<ByteBuf, ByteBuf> streamingCall =
460+
channel.newCall(flowControlledStreaming, CALL_OPTIONS);
461461
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
462462
new AtomicReference<StreamObserver<ByteBuf>>();
463463
StreamObserver<ByteBuf> requestObserver = ClientCalls.duplexStreamingCall(streamingCall,

benchmarks/src/jmh/java/io/grpc/benchmarks/netty/SingleThreadBlockingQpsBenchmark.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.grpc.benchmarks.netty;
22

3+
import io.grpc.CallOptions;
34
import io.grpc.stub.ClientCalls;
45
import io.netty.buffer.Unpooled;
56

@@ -50,7 +51,8 @@ public void teardown() throws Exception {
5051
*/
5152
@Benchmark
5253
public void blockingUnary() throws Exception {
53-
ClientCalls.blockingUnaryCall(channels[0].newCall(unaryMethod), Unpooled.EMPTY_BUFFER);
54+
ClientCalls.blockingUnaryCall(
55+
channels[0].newCall(unaryMethod, CallOptions.DEFAULT), Unpooled.EMPTY_BUFFER);
5456
}
5557

5658
/**

compiler/src/java_plugin/cpp/java_generator.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -326,15 +326,25 @@ static void PrintStub(const google::protobuf::ServiceDescriptor* service,
326326
p->Print("super(channel, config);\n");
327327
p->Outdent();
328328
p->Print("}\n\n");
329+
p->Print(
330+
*vars,
331+
"private $impl_name$($Channel$ channel,\n"
332+
" $service_name$ServiceDescriptor config,\n"
333+
" $CallOptions$ callOptions) {\n");
334+
p->Indent();
335+
p->Print("super(channel, config, callOptions);\n");
336+
p->Outdent();
337+
p->Print("}\n\n");
329338
p->Print(
330339
*vars,
331340
"@$Override$\n"
332341
"protected $impl_name$ build($Channel$ channel,\n"
333-
" $service_name$ServiceDescriptor config) {\n");
342+
" $service_name$ServiceDescriptor config,\n"
343+
" $CallOptions$ callOptions) {\n");
334344
p->Indent();
335345
p->Print(
336346
*vars,
337-
"return new $impl_name$(channel, config);\n");
347+
"return new $impl_name$(channel, config, callOptions);\n");
338348
p->Outdent();
339349
p->Print("}\n");
340350
}
@@ -430,7 +440,7 @@ static void PrintStub(const google::protobuf::ServiceDescriptor* service,
430440
p->Print(
431441
*vars,
432442
"return $calls_method$(\n"
433-
" channel.newCall(config.$lower_method_name$), $params$);\n");
443+
" channel.newCall(config.$lower_method_name$, callOptions), $params$);\n");
434444
break;
435445
case ASYNC_CALL:
436446
if (server_streaming) {
@@ -454,7 +464,7 @@ static void PrintStub(const google::protobuf::ServiceDescriptor* service,
454464
p->Print(
455465
*vars,
456466
"$last_line_prefix$$calls_method$(\n"
457-
" channel.newCall(config.$lower_method_name$), $params$);\n");
467+
" channel.newCall(config.$lower_method_name$, callOptions), $params$);\n");
458468
break;
459469
case FUTURE_CALL:
460470
CHECK(!client_streaming && !server_streaming)
@@ -465,7 +475,7 @@ static void PrintStub(const google::protobuf::ServiceDescriptor* service,
465475
p->Print(
466476
*vars,
467477
"return $calls_method$(\n"
468-
" channel.newCall(config.$lower_method_name$), request);\n");
478+
" channel.newCall(config.$lower_method_name$, callOptions), request);\n");
469479
break;
470480
}
471481
p->Outdent();
@@ -653,6 +663,7 @@ void GenerateService(const ServiceDescriptor* service,
653663
vars["String"] = "java.lang.String";
654664
vars["Override"] = "java.lang.Override";
655665
vars["Channel"] = "io.grpc.Channel";
666+
vars["CallOptions"] = "io.grpc.CallOptions";
656667
vars["MethodType"] = "io.grpc.MethodType";
657668
vars["ServerServiceDefinition"] =
658669
"io.grpc.ServerServiceDefinition";

0 commit comments

Comments
 (0)