Skip to content

Commit 141a1d2

Browse files
authored
core: install the binary logging client interceptor (grpc#3937)
1 parent 2601d54 commit 141a1d2

11 files changed

Lines changed: 570 additions & 63 deletions

core/src/main/java/io/grpc/ClientInterceptors.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package io.grpc;
1818

1919
import com.google.common.base.Preconditions;
20+
import io.grpc.MethodDescriptor.Marshaller;
21+
import java.io.InputStream;
2022
import java.util.ArrayList;
2123
import java.util.Arrays;
2224
import java.util.Collections;
@@ -89,6 +91,56 @@ public static Channel intercept(Channel channel, List<? extends ClientIntercepto
8991
return channel;
9092
}
9193

94+
/**
95+
* Creates a new ClientInterceptor that transforms requests into {@code WReqT} and responses into
96+
* {@code WRespT} before passing them into the {@code interceptor}.
97+
*/
98+
static <WReqT, WRespT> ClientInterceptor wrapClientInterceptor(
99+
final ClientInterceptor interceptor,
100+
final Marshaller<WReqT> reqMarshaller,
101+
final Marshaller<WRespT> respMarshaller) {
102+
return new ClientInterceptor() {
103+
@Override
104+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
105+
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
106+
final MethodDescriptor<WReqT, WRespT> wrappedMethod =
107+
method.toBuilder(reqMarshaller, respMarshaller).build();
108+
final ClientCall<WReqT, WRespT> wrappedCall =
109+
interceptor.interceptCall(wrappedMethod, callOptions, next);
110+
return new PartialForwardingClientCall<ReqT, RespT>() {
111+
@Override
112+
public void start(final Listener<RespT> responseListener, Metadata headers) {
113+
wrappedCall.start(new PartialForwardingClientCallListener<WRespT>() {
114+
@Override
115+
public void onMessage(WRespT wMessage) {
116+
InputStream bytes = respMarshaller.stream(wMessage);
117+
RespT message = method.getResponseMarshaller().parse(bytes);
118+
responseListener.onMessage(message);
119+
}
120+
121+
@Override
122+
protected Listener<?> delegate() {
123+
return responseListener;
124+
}
125+
}, headers);
126+
}
127+
128+
@Override
129+
public void sendMessage(ReqT message) {
130+
InputStream bytes = method.getRequestMarshaller().stream(message);
131+
WReqT wReq = reqMarshaller.parse(bytes);
132+
wrappedCall.sendMessage(wReq);
133+
}
134+
135+
@Override
136+
protected ClientCall<?, ?> delegate() {
137+
return wrappedCall;
138+
}
139+
};
140+
}
141+
};
142+
}
143+
92144
private static class InterceptorChannel extends Channel {
93145
private final Channel channel;
94146
private final ClientInterceptor interceptor;

core/src/main/java/io/grpc/ForwardingClientCall.java

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,57 +16,27 @@
1616

1717
package io.grpc;
1818

19-
import javax.annotation.Nullable;
20-
2119
/**
2220
* A {@link ClientCall} which forwards all of it's methods to another {@link ClientCall}.
2321
*/
24-
public abstract class ForwardingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
22+
public abstract class ForwardingClientCall<ReqT, RespT>
23+
extends PartialForwardingClientCall<ReqT, RespT> {
2524
/**
2625
* Returns the delegated {@code ClientCall}.
2726
*/
27+
@Override
2828
protected abstract ClientCall<ReqT, RespT> delegate();
2929

3030
@Override
3131
public void start(Listener<RespT> responseListener, Metadata headers) {
3232
delegate().start(responseListener, headers);
3333
}
3434

35-
@Override
36-
public void request(int numMessages) {
37-
delegate().request(numMessages);
38-
}
39-
40-
@Override
41-
public void cancel(@Nullable String message, @Nullable Throwable cause) {
42-
delegate().cancel(message, cause);
43-
}
44-
45-
@Override
46-
public void halfClose() {
47-
delegate().halfClose();
48-
}
49-
5035
@Override
5136
public void sendMessage(ReqT message) {
5237
delegate().sendMessage(message);
5338
}
5439

55-
@Override
56-
public void setMessageCompression(boolean enabled) {
57-
delegate().setMessageCompression(enabled);
58-
}
59-
60-
@Override
61-
public boolean isReady() {
62-
return delegate().isReady();
63-
}
64-
65-
@Override
66-
public Attributes getAttributes() {
67-
return delegate().getAttributes();
68-
}
69-
7040
/**
7141
* A simplified version of {@link ForwardingClientCall} where subclasses can pass in a {@link
7242
* ClientCall} as the delegate.

core/src/main/java/io/grpc/ForwardingClientCallListener.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,19 @@
2020
* A {@link ClientCall.Listener} which forwards all of its methods to another {@link
2121
* ClientCall.Listener}.
2222
*/
23-
public abstract class ForwardingClientCallListener<RespT> extends ClientCall.Listener<RespT> {
23+
public abstract class ForwardingClientCallListener<RespT>
24+
extends PartialForwardingClientCallListener<RespT> {
2425
/**
2526
* Returns the delegated {@code ClientCall.Listener}.
2627
*/
27-
protected abstract ClientCall.Listener<RespT> delegate();
28-
2928
@Override
30-
public void onHeaders(Metadata headers) {
31-
delegate().onHeaders(headers);
32-
}
29+
protected abstract ClientCall.Listener<RespT> delegate();
3330

3431
@Override
3532
public void onMessage(RespT message) {
3633
delegate().onMessage(message);
3734
}
3835

39-
@Override
40-
public void onClose(Status status, Metadata trailers) {
41-
delegate().onClose(status, trailers);
42-
}
43-
44-
@Override
45-
public void onReady() {
46-
delegate().onReady();
47-
}
48-
4936
/**
5037
* A simplified version of {@link ForwardingClientCallListener} where subclasses can pass in a
5138
* {@link ClientCall.Listener} as the delegate.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2017, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
import io.grpc.MethodDescriptor.Marshaller;
20+
21+
/**
22+
* Accessor to internal methods of {@link ServerInterceptors}.
23+
*/
24+
@Internal
25+
public class InternalClientInterceptors {
26+
public static <WReqT, WRespT> ClientInterceptor wrapClientInterceptor(
27+
final ClientInterceptor wrappedInterceptor,
28+
final Marshaller<WReqT> reqMarshaller,
29+
final Marshaller<WRespT> respMarshaller) {
30+
return ClientInterceptors.wrapClientInterceptor(
31+
wrappedInterceptor, reqMarshaller, respMarshaller);
32+
}
33+
34+
private InternalClientInterceptors() {
35+
}
36+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2018, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
import javax.annotation.Nullable;
20+
21+
/**
22+
* A {@link ClientCall} which forwards all of its methods to another {@link ClientCall} which
23+
* may have a different sendMessage() message type.
24+
*/
25+
abstract class PartialForwardingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
26+
/**
27+
* Returns the delegated {@code ClientCall}.
28+
*/
29+
protected abstract ClientCall<?, ?> delegate();
30+
31+
@Override
32+
public void request(int numMessages) {
33+
delegate().request(numMessages);
34+
}
35+
36+
@Override
37+
public void cancel(@Nullable String message, @Nullable Throwable cause) {
38+
delegate().cancel(message, cause);
39+
}
40+
41+
@Override
42+
public void halfClose() {
43+
delegate().halfClose();
44+
}
45+
46+
@Override
47+
public void setMessageCompression(boolean enabled) {
48+
delegate().setMessageCompression(enabled);
49+
}
50+
51+
@Override
52+
public boolean isReady() {
53+
return delegate().isReady();
54+
}
55+
56+
@Override
57+
public Attributes getAttributes() {
58+
return delegate().getAttributes();
59+
}
60+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2018, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
/**
20+
* A {@link ClientCall.Listener} which forwards all of its methods to another {@link
21+
* ClientCall.Listener} which may have a different parameterized type than the
22+
* onMessage() message type.
23+
*/
24+
abstract class PartialForwardingClientCallListener<RespT> extends ClientCall.Listener<RespT> {
25+
/**
26+
* Returns the delegated {@code ClientCall.Listener}.
27+
*/
28+
protected abstract ClientCall.Listener<?> delegate();
29+
30+
@Override
31+
public void onHeaders(Metadata headers) {
32+
delegate().onHeaders(headers);
33+
}
34+
35+
@Override
36+
public void onClose(Status status, Metadata trailers) {
37+
delegate().onClose(status, trailers);
38+
}
39+
40+
@Override
41+
public void onReady() {
42+
delegate().onReady();
43+
}
44+
}

core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ public static ManagedChannelBuilder<?> forTarget(String target) {
134134

135135
private int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
136136

137+
BinaryLogProvider binlogProvider = BinaryLogProvider.provider();
138+
137139
/**
138140
* Sets the maximum message size allowed for a single gRPC frame. If an inbound messages
139141
* larger than this limit is received it will not be processed and the RPC will fail with

0 commit comments

Comments
 (0)