Skip to content

Commit 6a67a97

Browse files
lukaszx0ejona86
authored andcommitted
Add attributes to ServerCall
1 parent b37ebd6 commit 6a67a97

File tree

13 files changed

+176
-2
lines changed

13 files changed

+176
-2
lines changed

core/src/main/java/io/grpc/PartialForwardingServerCall.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,10 @@ public void setMessageCompression(boolean enabled) {
7777
public void setCompression(String compressor) {
7878
delegate().setCompression(compressor);
7979
}
80+
81+
@Override
82+
@ExperimentalApi
83+
public Attributes attributes() {
84+
return delegate().attributes();
85+
}
8086
}

core/src/main/java/io/grpc/ServerCall.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,4 +196,15 @@ public void setMessageCompression(boolean enabled) {
196196
public void setCompression(String compressor) {
197197
// noop
198198
}
199+
200+
/**
201+
* Returns properties of a single call. This is a generic container which can contain any kind of
202+
* information describing call like for example remote address, TLS information (OU etc.)
203+
*
204+
* @return Attributes container
205+
*/
206+
@ExperimentalApi
207+
public Attributes attributes() {
208+
return Attributes.EMPTY;
209+
}
199210
}

core/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import static com.google.common.base.Preconditions.checkNotNull;
3535

36+
import io.grpc.Attributes;
3637
import io.grpc.Compressor;
3738
import io.grpc.Decompressor;
3839
import io.grpc.Metadata;
@@ -366,6 +367,11 @@ public void setCompressor(Compressor compressor) {}
366367

367368
@Override
368369
public void setDecompressor(Decompressor decompressor) {}
370+
371+
// TODO(lukasz) should we return something here?
372+
@Override public Attributes attributes() {
373+
return Attributes.EMPTY;
374+
}
369375
}
370376

371377
private class InProcessClientStream implements ClientStream {

core/src/main/java/io/grpc/internal/AbstractServerStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import com.google.common.base.Preconditions;
3737

38+
import io.grpc.Attributes;
3839
import io.grpc.Metadata;
3940
import io.grpc.Status;
4041

@@ -278,4 +279,8 @@ private void closeListener(Status newStatus) {
278279
listener().closed(newStatus);
279280
}
280281
}
282+
283+
@Override public Attributes attributes() {
284+
return Attributes.EMPTY;
285+
}
281286
}

core/src/main/java/io/grpc/internal/GrpcUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@
4141
import com.google.common.collect.ImmutableMap;
4242
import com.google.common.util.concurrent.ThreadFactoryBuilder;
4343

44+
import io.grpc.Attributes;
4445
import io.grpc.Metadata;
4546
import io.grpc.Status;
4647
import io.grpc.internal.SharedResourceHolder.Resource;
4748

4849
import java.lang.reflect.Method;
4950
import java.net.HttpURLConnection;
51+
import java.net.SocketAddress;
5052
import java.net.URI;
5153
import java.net.URISyntaxException;
5254
import java.util.Map.Entry;
@@ -56,6 +58,7 @@
5658
import java.util.concurrent.TimeUnit;
5759

5860
import javax.annotation.Nullable;
61+
import javax.net.ssl.SSLSession;
5962

6063
/**
6164
* Common utilities for GRPC.
@@ -92,6 +95,18 @@ public final class GrpcUtil {
9295
public static final Metadata.Key<String> USER_AGENT_KEY =
9396
Metadata.Key.of("user-agent", Metadata.ASCII_STRING_MARSHALLER);
9497

98+
/**
99+
* {@link io.grpc.Attributes.Key} for the remote address of stream call.
100+
*/
101+
public static final Attributes.Key<SocketAddress> REMOTE_ADDR_STREAM_ATTR_KEY =
102+
Attributes.Key.of("io.grpc.RemoteAddr");
103+
104+
/**
105+
* {@link io.grpc.Attributes.Key} for the SSL session of stream call.
106+
*/
107+
public static final Attributes.Key<SSLSession> SSL_SESSION_STREAM_ATTR_KEY =
108+
Attributes.Key.of("io.grpc.SslSession");
109+
95110
/**
96111
* The default port for plain-text connections.
97112
*/

core/src/main/java/io/grpc/internal/ServerCallImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.google.common.annotations.VisibleForTesting;
4343
import com.google.common.base.Throwables;
4444

45+
import io.grpc.Attributes;
4546
import io.grpc.Codec;
4647
import io.grpc.Compressor;
4748
import io.grpc.CompressorRegistry;
@@ -200,6 +201,11 @@ ServerStreamListener newServerStreamListener(ServerCall.Listener<ReqT> listener,
200201
return new ServerStreamListenerImpl<ReqT>(this, listener, timeout, context);
201202
}
202203

204+
@Override
205+
public Attributes attributes() {
206+
return stream.attributes();
207+
}
208+
203209
/**
204210
* All of these callbacks are assumed to called on an application thread, and the caller is
205211
* responsible for handling thrown exceptions.

core/src/main/java/io/grpc/internal/ServerStream.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package io.grpc.internal;
3333

34+
import io.grpc.Attributes;
3435
import io.grpc.Metadata;
3536
import io.grpc.Status;
3637

@@ -66,4 +67,11 @@ public interface ServerStream extends Stream {
6667
* times and from any thread.
6768
*/
6869
void cancel(Status status);
70+
71+
/**
72+
* Attributes describing stream.
73+
*
74+
* @return Attributes container
75+
*/
76+
Attributes attributes();
6977
}

interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import com.google.auth.oauth2.ServiceAccountCredentials;
5151
import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials;
5252
import com.google.common.collect.ImmutableList;
53+
import com.google.common.collect.Lists;
54+
import com.google.common.net.HostAndPort;
5355
import com.google.protobuf.ByteString;
5456
import com.google.protobuf.EmptyProtos.Empty;
5557

@@ -59,6 +61,7 @@
5961
import io.grpc.Metadata;
6062
import io.grpc.Server;
6163
import io.grpc.ServerBuilder;
64+
import io.grpc.ServerCall;
6265
import io.grpc.ServerInterceptor;
6366
import io.grpc.ServerInterceptors;
6467
import io.grpc.Status;
@@ -88,6 +91,8 @@
8891

8992
import java.io.IOException;
9093
import java.io.InputStream;
94+
import java.security.cert.Certificate;
95+
import java.security.cert.X509Certificate;
9196
import java.util.Arrays;
9297
import java.util.List;
9398
import java.util.concurrent.ArrayBlockingQueue;
@@ -96,13 +101,18 @@
96101
import java.util.concurrent.TimeUnit;
97102
import java.util.concurrent.atomic.AtomicReference;
98103

104+
import javax.net.ssl.SSLPeerUnverifiedException;
105+
import javax.net.ssl.SSLSession;
106+
99107
/**
100108
* Abstract base class for all GRPC transport tests.
101109
*/
102110
public abstract class AbstractInteropTest {
103111

104112
public static final Metadata.Key<Messages.SimpleContext> METADATA_KEY =
105113
ProtoUtils.keyForProto(Messages.SimpleContext.getDefaultInstance());
114+
private static final AtomicReference<ServerCall> serverCallCapture =
115+
new AtomicReference<ServerCall>();
106116
private static final AtomicReference<Metadata> requestHeadersCapture =
107117
new AtomicReference<Metadata>();
108118
private static ScheduledExecutorService testServiceExecutor;
@@ -114,6 +124,7 @@ protected static void startStaticServer(
114124
testServiceExecutor = Executors.newScheduledThreadPool(2);
115125

116126
List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder()
127+
.add(TestUtils.recordServerCallInterceptor(serverCallCapture))
117128
.add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture))
118129
.add(TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY))
119130
.add(interceptors)
@@ -920,4 +931,38 @@ protected static void assertSuccess(StreamRecorder<?> recorder) {
920931
throw new AssertionError(recorder.getError());
921932
}
922933
}
934+
935+
/** Helper for asserting remote address {@link io.grpc.ServerCall#attributes()} */
936+
protected void assertRemoteAddr(String expectedRemoteAddress) {
937+
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel)
938+
.withDeadlineAfter(5, TimeUnit.SECONDS);
939+
940+
stub.unaryCall(SimpleRequest.getDefaultInstance());
941+
942+
HostAndPort remoteAddress = HostAndPort.fromString(serverCallCapture.get().attributes()
943+
.get(GrpcUtil.REMOTE_ADDR_STREAM_ATTR_KEY).toString());
944+
assertEquals(expectedRemoteAddress, remoteAddress.getHostText());
945+
}
946+
947+
/** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#attributes()} */
948+
protected void assertX500SubjectDn(String tlsInfo) {
949+
TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel)
950+
.withDeadlineAfter(5, TimeUnit.SECONDS);
951+
952+
stub.unaryCall(SimpleRequest.getDefaultInstance());
953+
954+
List<Certificate> certificates = Lists.newArrayList();
955+
SSLSession sslSession =
956+
serverCallCapture.get().attributes().get(GrpcUtil.SSL_SESSION_STREAM_ATTR_KEY);
957+
try {
958+
certificates = Arrays.asList(sslSession.getPeerCertificates());
959+
} catch (SSLPeerUnverifiedException e) {
960+
fail("No cert");
961+
}
962+
963+
X509Certificate x509cert = (X509Certificate) certificates.get(0);
964+
965+
assertEquals(1, certificates.size());
966+
assertEquals(tlsInfo, x509cert.getSubjectDN().toString());
967+
}
923968
}

interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@
3636
import io.grpc.netty.NettyChannelBuilder;
3737
import io.grpc.netty.NettyServerBuilder;
3838
import io.grpc.testing.TestUtils;
39+
import io.netty.handler.ssl.ClientAuth;
3940
import io.netty.handler.ssl.SslProvider;
4041
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
4142

4243
import org.junit.AfterClass;
4344
import org.junit.BeforeClass;
45+
import org.junit.Test;
4446
import org.junit.runner.RunWith;
4547
import org.junit.runners.JUnit4;
4648

@@ -61,6 +63,8 @@ public static void startServer() {
6163
.flowControlWindow(65 * 1024)
6264
.sslContext(GrpcSslContexts
6365
.forServer(TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key"))
66+
.clientAuth(ClientAuth.REQUIRE)
67+
.trustManager(TestUtils.loadCert("ca.pem"))
6468
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
6569
.sslProvider(SslProvider.OPENSSL)
6670
.build()));
@@ -79,8 +83,10 @@ protected ManagedChannel createChannel() {
7983
try {
8084
return NettyChannelBuilder
8185
.forAddress(TestUtils.testServerAddress(serverPort))
82-
.sslContext(GrpcSslContexts.forClient()
83-
.trustManager(TestUtils.loadCert("ca.pem"))
86+
.sslContext(GrpcSslContexts
87+
.forClient()
88+
.keyManager(TestUtils.loadCert("client.pem"), TestUtils.loadCert("client.key"))
89+
.trustManager(TestUtils.loadX509Cert("ca.pem"))
8490
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
8591
.sslProvider(SslProvider.OPENSSL)
8692
.build())
@@ -89,4 +95,14 @@ protected ManagedChannel createChannel() {
8995
throw new RuntimeException(ex);
9096
}
9197
}
98+
99+
@Test(timeout = 10000)
100+
public void remoteAddr() {
101+
assertRemoteAddr("/127.0.0.1");
102+
}
103+
104+
@Test(timeout = 10000)
105+
public void tlsInfo() {
106+
assertX500SubjectDn("CN=testclient, O=Internet Widgits Pty Ltd, ST=Some-State, C=AU");
107+
}
92108
}

netty/src/main/java/io/grpc/netty/NettyServerStream.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@
3333

3434
import static com.google.common.base.Preconditions.checkNotNull;
3535

36+
import io.grpc.Attributes;
3637
import io.grpc.Metadata;
3738
import io.grpc.Status;
3839
import io.grpc.internal.AbstractServerStream;
40+
import io.grpc.internal.GrpcUtil;
3941
import io.grpc.internal.WritableBuffer;
4042
import io.netty.buffer.ByteBuf;
4143
import io.netty.channel.Channel;
@@ -44,6 +46,8 @@
4446
import io.netty.handler.codec.http2.Http2Headers;
4547
import io.netty.handler.codec.http2.Http2Stream;
4648

49+
import javax.net.ssl.SSLSession;
50+
4751
/**
4852
* Server stream for a Netty HTTP2 transport.
4953
*/
@@ -53,6 +57,7 @@ class NettyServerStream extends AbstractServerStream<Integer> {
5357
private final NettyServerHandler handler;
5458
private final Http2Stream http2Stream;
5559
private final WriteQueue writeQueue;
60+
private final Attributes attributes;
5661

5762
NettyServerStream(Channel channel, Http2Stream http2Stream, NettyServerHandler handler,
5863
int maxMessageSize) {
@@ -61,6 +66,7 @@ class NettyServerStream extends AbstractServerStream<Integer> {
6166
this.channel = checkNotNull(channel, "channel");
6267
this.http2Stream = checkNotNull(http2Stream, "http2Stream");
6368
this.handler = checkNotNull(handler, "handler");
69+
this.attributes = buildAttributes(channel);
6470
}
6571

6672
@Override
@@ -140,4 +146,21 @@ protected void sendStreamAbortToClient(Status status, Metadata trailers) {
140146
public void cancel(Status status) {
141147
writeQueue.enqueue(new CancelServerStreamCommand(this, status), true);
142148
}
149+
150+
@Override public Attributes attributes() {
151+
return attributes;
152+
}
153+
154+
private static Attributes buildAttributes(Channel channel) {
155+
// NB(lukaszx0) SSLSession will be set only if SSL handshake was successful
156+
SSLSession sslSession = null;
157+
if (channel.hasAttr(Utils.SSL_SESSION_ATTR_KEY)) {
158+
sslSession = channel.attr(Utils.SSL_SESSION_ATTR_KEY).get();
159+
}
160+
161+
return Attributes.newBuilder()
162+
.set(GrpcUtil.REMOTE_ADDR_STREAM_ATTR_KEY, channel.remoteAddress())
163+
.set(GrpcUtil.SSL_SESSION_STREAM_ATTR_KEY, sslSession)
164+
.build();
165+
}
143166
}

0 commit comments

Comments
 (0)