Skip to content

Commit 83b92cf

Browse files
authored
core: pass transport attributes to ClientStreamTracer.Factory.newClientStreamTracer() (grpc#5380)
This will be a new override. The old override is now deprecated. In order to pass new information without adding new overrides, I shoved most information into an object called StreamInfo. The Metadata is left out to draw attention because it's mutable. Motivation: this is needed for correctly supporting pick_first in GRPCLB. GRPCLB needs to add a token to the headers, and the token varies among servers. With round_robin, GRPCLB create a Subchannel for each server, thus can attach the token when the Subchannel is picked. To implement pick_first, all server addresses will be put in a single Subchannel, we will need to add the header in newClientStreamTracer(), by looking up the server address from the transport attributes and deciding which token to add.
1 parent 6c32eaf commit 83b92cf

File tree

17 files changed

+135
-37
lines changed

17 files changed

+135
-37
lines changed

core/src/jmh/java/io/grpc/internal/StatsTraceContextBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.grpc.internal;
1818

19+
import io.grpc.Attributes;
1920
import io.grpc.CallOptions;
2021
import io.grpc.Metadata;
2122
import io.grpc.MethodDescriptor;
@@ -49,7 +50,7 @@ public class StatsTraceContextBenchmark {
4950
@BenchmarkMode(Mode.SampleTime)
5051
@OutputTimeUnit(TimeUnit.NANOSECONDS)
5152
public StatsTraceContext newClientContext() {
52-
return StatsTraceContext.newClientContext(CallOptions.DEFAULT, emptyMetadata);
53+
return StatsTraceContext.newClientContext(CallOptions.DEFAULT, Attributes.EMPTY, emptyMetadata);
5354
}
5455

5556
/**

core/src/main/java/io/grpc/ClientStreamTracer.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.grpc;
1818

19+
import io.grpc.Grpc;
1920
import javax.annotation.concurrent.ThreadSafe;
2021

2122
/**
@@ -57,9 +58,44 @@ public abstract static class Factory {
5758
* @param headers the mutable headers of the stream. It can be safely mutated within this
5859
* method. It should not be saved because it is not safe for read or write after the
5960
* method returns.
61+
*
62+
* @deprecated use {@link #newClientStreamTracer(StreamInfo, Metadata)} instead
6063
*/
64+
@Deprecated
6165
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
6266
throw new UnsupportedOperationException("Not implemented");
6367
}
68+
69+
/**
70+
* Creates a {@link ClientStreamTracer} for a new client stream. This is called inside the
71+
* transport when it's creating the stream.
72+
*
73+
* @param info information about the stream
74+
* @param headers the mutable headers of the stream. It can be safely mutated within this
75+
* method. Changes made to it will be sent by the stream. It should not be saved
76+
* because it is not safe for read or write after the method returns.
77+
*
78+
* @since 1.20.0
79+
*/
80+
@SuppressWarnings("deprecation")
81+
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
82+
return newClientStreamTracer(info.getCallOptions(), headers);
83+
}
84+
}
85+
86+
/**
87+
* Information about a stream.
88+
*/
89+
public abstract static class StreamInfo {
90+
/**
91+
* Returns the attributes of the transport that this stream was created on.
92+
*/
93+
@Grpc.TransportAttr
94+
public abstract Attributes getTransportAttrs();
95+
96+
/**
97+
* Returns the effective CallOptions of the call.
98+
*/
99+
public abstract CallOptions getCallOptions();
64100
}
65101
}

core/src/main/java/io/grpc/inprocess/InProcessTransport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public synchronized ClientStream newStream(
167167
final MethodDescriptor<?, ?> method, final Metadata headers, final CallOptions callOptions) {
168168
if (shutdownStatus != null) {
169169
return failedClientStream(
170-
StatsTraceContext.newClientContext(callOptions, headers), shutdownStatus);
170+
StatsTraceContext.newClientContext(callOptions, attributes, headers), shutdownStatus);
171171
}
172172

173173
headers.put(GrpcUtil.USER_AGENT_KEY, userAgent);
@@ -186,7 +186,7 @@ public synchronized ClientStream newStream(
186186
serverMaxInboundMetadataSize,
187187
metadataSize));
188188
return failedClientStream(
189-
StatsTraceContext.newClientContext(callOptions, headers), status);
189+
StatsTraceContext.newClientContext(callOptions, attributes, headers), status);
190190
}
191191
}
192192

@@ -625,7 +625,7 @@ private class InProcessClientStream implements ClientStream {
625625

626626
InProcessClientStream(CallOptions callOptions, Metadata headers) {
627627
this.callOptions = callOptions;
628-
statsTraceCtx = StatsTraceContext.newClientContext(callOptions, headers);
628+
statsTraceCtx = StatsTraceContext.newClientContext(callOptions, attributes, headers);
629629
}
630630

631631
private synchronized void setListener(ServerStreamListener listener) {

core/src/main/java/io/grpc/internal/CensusStatsModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,8 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory {
367367
}
368368

369369
@Override
370-
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
370+
public ClientStreamTracer newClientStreamTracer(
371+
ClientStreamTracer.StreamInfo info, Metadata headers) {
371372
ClientTracer tracer = new ClientTracer(module, startCtx);
372373
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
373374
// one streams. We will need to update this file to support them.

core/src/main/java/io/grpc/internal/CensusTracingModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ final class ClientCallTracer extends ClientStreamTracer.Factory {
242242
}
243243

244244
@Override
245-
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
245+
public ClientStreamTracer newClientStreamTracer(
246+
ClientStreamTracer.StreamInfo info, Metadata headers) {
246247
if (span != BlankSpan.INSTANCE) {
247248
headers.discardAll(tracingHeader);
248249
headers.put(tracingHeader, span.getContext());

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.base.Objects;
2424
import io.grpc.Attributes;
25-
import io.grpc.CallOptions;
2625
import io.grpc.ClientStreamTracer;
2726
import io.grpc.Compressor;
2827
import io.grpc.Deadline;
@@ -194,7 +193,8 @@ private Substream createSubstream(int previousAttemptCount) {
194193
final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
195194
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
196195
@Override
197-
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
196+
public ClientStreamTracer newClientStreamTracer(
197+
ClientStreamTracer.StreamInfo info, Metadata headers) {
198198
return bufferSizeTracer;
199199
}
200200
};

core/src/main/java/io/grpc/internal/StatsTraceContext.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.common.base.Preconditions.checkNotNull;
2020

2121
import com.google.common.annotations.VisibleForTesting;
22+
import io.grpc.Attributes;
2223
import io.grpc.CallOptions;
2324
import io.grpc.ClientStreamTracer;
2425
import io.grpc.Context;
@@ -46,16 +47,28 @@ public final class StatsTraceContext {
4647
/**
4748
* Factory method for the client-side.
4849
*/
49-
public static StatsTraceContext newClientContext(CallOptions callOptions, Metadata headers) {
50+
public static StatsTraceContext newClientContext(
51+
final CallOptions callOptions, final Attributes transportAttrs, Metadata headers) {
5052
List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
5153
if (factories.isEmpty()) {
5254
return NOOP;
5355
}
56+
ClientStreamTracer.StreamInfo info = new ClientStreamTracer.StreamInfo() {
57+
@Override
58+
public Attributes getTransportAttrs() {
59+
return transportAttrs;
60+
}
61+
62+
@Override
63+
public CallOptions getCallOptions() {
64+
return callOptions;
65+
}
66+
};
5467
// This array will be iterated multiple times per RPC. Use primitive array instead of Collection
5568
// so that for-each doesn't create an Iterator every time.
5669
StreamTracer[] tracers = new StreamTracer[factories.size()];
5770
for (int i = 0; i < tracers.length; i++) {
58-
tracers[i] = factories.get(i).newClientStreamTracer(callOptions, headers);
71+
tracers[i] = factories.get(i).newClientStreamTracer(info, headers);
5972
}
6073
return new StatsTraceContext(tracers);
6174
}

core/src/test/java/io/grpc/CallOptionsTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ private static class FakeTracerFactory extends ClientStreamTracer.Factory {
270270
}
271271

272272
@Override
273-
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
273+
public ClientStreamTracer newClientStreamTracer(
274+
ClientStreamTracer.StreamInfo info, Metadata headers) {
274275
return new ClientStreamTracer() {};
275276
}
276277

core/src/test/java/io/grpc/internal/CensusModulesTest.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,18 @@ public class CensusModulesTest {
105105
CallOptions.Key.createWithDefault("option1", "default");
106106
private static final CallOptions CALL_OPTIONS =
107107
CallOptions.DEFAULT.withOption(CUSTOM_OPTION, "customvalue");
108+
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
109+
new ClientStreamTracer.StreamInfo() {
110+
@Override
111+
public Attributes getTransportAttrs() {
112+
return Attributes.EMPTY;
113+
}
114+
115+
@Override
116+
public CallOptions getCallOptions() {
117+
return CallOptions.DEFAULT;
118+
}
119+
};
108120

109121
private static class StringInputStream extends InputStream {
110122
final String string;
@@ -370,7 +382,7 @@ private void subtestClientBasicStatsDefaultContext(
370382
localCensusStats.newClientCallTracer(
371383
tagger.empty(), method.getFullMethodName());
372384
Metadata headers = new Metadata();
373-
ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
385+
ClientStreamTracer tracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
374386

375387
if (recordStarts) {
376388
StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord();
@@ -494,8 +506,7 @@ public void clientBasicTracingDefaultSpan() {
494506
CensusTracingModule.ClientCallTracer callTracer =
495507
censusTracing.newClientCallTracer(null, method);
496508
Metadata headers = new Metadata();
497-
ClientStreamTracer clientStreamTracer =
498-
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
509+
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(STREAM_INFO, headers);
499510
verify(tracer).spanBuilderWithExplicitParent(
500511
eq("Sent.package1.service2.method3"), isNull(Span.class));
501512
verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
@@ -655,7 +666,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS
655666
CensusStatsModule.ClientCallTracer callTracer =
656667
census.newClientCallTracer(clientCtx, method.getFullMethodName());
657668
// This propagates clientCtx to headers if propagates==true
658-
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
669+
callTracer.newClientStreamTracer(STREAM_INFO, headers);
659670
if (recordStats) {
660671
// Client upstart record
661672
StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord();
@@ -744,7 +755,7 @@ public void statsHeadersNotPropagateDefaultContext() {
744755
CensusStatsModule.ClientCallTracer callTracer =
745756
censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName());
746757
Metadata headers = new Metadata();
747-
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
758+
callTracer.newClientStreamTracer(STREAM_INFO, headers);
748759
assertFalse(headers.containsKey(censusStats.statsHeader));
749760
// Clear recorded stats to satisfy the assertions in wrapUp()
750761
statsRecorder.rolloverRecords();
@@ -775,7 +786,7 @@ public void traceHeadersPropagateSpanContext() throws Exception {
775786
CensusTracingModule.ClientCallTracer callTracer =
776787
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
777788
Metadata headers = new Metadata();
778-
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
789+
callTracer.newClientStreamTracer(STREAM_INFO, headers);
779790

780791
verify(mockTracingPropagationHandler).toByteArray(same(fakeClientSpanContext));
781792
verifyNoMoreInteractions(mockTracingPropagationHandler);
@@ -803,7 +814,7 @@ public void traceHeaders_propagateSpanContext() throws Exception {
803814
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
804815
Metadata headers = new Metadata();
805816

806-
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
817+
callTracer.newClientStreamTracer(STREAM_INFO, headers);
807818

808819
assertThat(headers.keys()).isNotEmpty();
809820
}
@@ -817,7 +828,7 @@ public void traceHeaders_missingCensusImpl_notPropagateSpanContext()
817828

818829
CensusTracingModule.ClientCallTracer callTracer =
819830
censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method);
820-
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
831+
callTracer.newClientStreamTracer(STREAM_INFO, headers);
821832

822833
assertThat(headers.keys()).isEmpty();
823834
}
@@ -834,7 +845,7 @@ public void traceHeaders_clientMissingCensusImpl_preservingHeaders() throws Exce
834845

835846
CensusTracingModule.ClientCallTracer callTracer =
836847
censusTracing.newClientCallTracer(BlankSpan.INSTANCE, method);
837-
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
848+
callTracer.newClientStreamTracer(STREAM_INFO, headers);
838849

839850
assertThat(headers.keys()).containsExactlyElementsIn(originalHeaderKeys);
840851
}

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141

4242
import com.google.common.collect.ImmutableSet;
4343
import com.google.common.util.concurrent.MoreExecutors;
44+
import io.grpc.Attributes;
4445
import io.grpc.CallOptions;
4546
import io.grpc.ClientStreamTracer;
4647
import io.grpc.Codec;
@@ -90,6 +91,18 @@ public class RetriableStreamTest {
9091
private static final long MAX_BACKOFF_IN_SECONDS = 700;
9192
private static final double BACKOFF_MULTIPLIER = 2D;
9293
private static final double FAKE_RANDOM = .5D;
94+
private static final ClientStreamTracer.StreamInfo STREAM_INFO =
95+
new ClientStreamTracer.StreamInfo() {
96+
@Override
97+
public Attributes getTransportAttrs() {
98+
return Attributes.EMPTY;
99+
}
100+
101+
@Override
102+
public CallOptions getCallOptions() {
103+
return CallOptions.DEFAULT;
104+
}
105+
};
93106

94107
static {
95108
RetriableStream.setRandom(
@@ -168,7 +181,7 @@ void postCommit() {
168181
@Override
169182
ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata) {
170183
bufferSizeTracer =
171-
tracerFactory.newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
184+
tracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
172185
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null
173186
? 0 : Integer.valueOf(metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
174187
return retriableStreamRecorder.newSubstream(actualPreviousRpcAttemptsInHeader);

0 commit comments

Comments
 (0)