Skip to content

Commit 4c78a97

Browse files
authored
Plumb optional labels from LB to ClientStreamTracer
As part of gRFC A78: > To support the locality label in the per-call metrics, we will provide > a mechanism for LB picker to add optional labels to the call attempt > tracer.
1 parent 06df25b commit 4c78a97

File tree

18 files changed

+305
-94
lines changed

18 files changed

+305
-94
lines changed

api/src/main/java/io/grpc/ClientStreamTracer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ public void inboundHeaders() {
7979
public void inboundTrailers(Metadata trailers) {
8080
}
8181

82+
/**
83+
* Information providing context to the call became available.
84+
*/
85+
@Internal
86+
public void addOptionalLabel(String key, String value) {
87+
}
88+
8289
/**
8390
* Factory class for {@link ClientStreamTracer}.
8491
*/

api/src/main/java/io/grpc/LoadBalancer.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,29 @@ public abstract static class PickSubchannelArgs {
490490
* @since 1.2.0
491491
*/
492492
public abstract MethodDescriptor<?, ?> getMethodDescriptor();
493+
494+
/**
495+
* Gets an object that can be informed about what sort of pick was made.
496+
*/
497+
@Internal
498+
public PickDetailsConsumer getPickDetailsConsumer() {
499+
return new PickDetailsConsumer() {};
500+
}
501+
}
502+
503+
/** Receives information about the pick being chosen. */
504+
@Internal
505+
public interface PickDetailsConsumer {
506+
/**
507+
* Optional labels that provide context of how the pick was routed. Particularly helpful for
508+
* per-RPC metrics.
509+
*
510+
* @throws NullPointerException if key or value is {@code null}
511+
*/
512+
default void addOptionalLabel(String key, String value) {
513+
checkNotNull(key, "key");
514+
checkNotNull(value, "value");
515+
}
493516
}
494517

495518
/**
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
import com.google.common.base.Preconditions;
20+
import io.grpc.CallOptions;
21+
import io.grpc.LoadBalancer.PickSubchannelArgs;
22+
import io.grpc.Metadata;
23+
import io.grpc.MethodDescriptor;
24+
import org.mockito.ArgumentMatcher;
25+
import org.mockito.ArgumentMatchers;
26+
27+
/**
28+
* Mockito Matcher for {@link PickSubchannelArgs}.
29+
*/
30+
public final class PickSubchannelArgsMatcher implements ArgumentMatcher<PickSubchannelArgs> {
31+
private final MethodDescriptor<?, ?> method;
32+
private final Metadata headers;
33+
private final CallOptions callOptions;
34+
35+
public PickSubchannelArgsMatcher(
36+
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
37+
this.method = Preconditions.checkNotNull(method, "method");
38+
this.headers = Preconditions.checkNotNull(headers, "headers");
39+
this.callOptions = Preconditions.checkNotNull(callOptions, "callOptions");
40+
}
41+
42+
@Override
43+
public boolean matches(PickSubchannelArgs args) {
44+
return args != null
45+
&& method.equals(args.getMethodDescriptor())
46+
&& headers.equals(args.getHeaders())
47+
&& callOptions.equals(args.getCallOptions());
48+
}
49+
50+
@Override
51+
public final String toString() {
52+
return "[method=" + method + " headers=" + headers + " callOptions=" + callOptions + "]";
53+
}
54+
55+
public static PickSubchannelArgs eqPickSubchannelArgs(
56+
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
57+
return ArgumentMatchers.argThat(new PickSubchannelArgsMatcher(method, headers, callOptions));
58+
}
59+
}

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ public final ClientStream newStream(
137137
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
138138
ClientStreamTracer[] tracers) {
139139
try {
140-
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
140+
PickSubchannelArgs args = new PickSubchannelArgsImpl(
141+
method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
141142
SubchannelPicker picker = null;
142143
long pickerVersion = -1;
143144
while (true) {

core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ public void inboundTrailers(Metadata trailers) {
5454
delegate().inboundTrailers(trailers);
5555
}
5656

57+
@Override
58+
public void addOptionalLabel(String key, String value) {
59+
delegate().addOptionalLabel(key, value);
60+
}
61+
5762
@Override
5863
public void streamClosed(Status status) {
5964
delegate().streamClosed(status);

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ public Result selectConfig(PickSubchannelArgs args) {
158158
throw new IllegalStateException("Resolution is pending");
159159
}
160160
};
161+
private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
162+
new LoadBalancer.PickDetailsConsumer() {};
161163

162164
private final InternalLogId logId;
163165
private final String target;
@@ -519,11 +521,11 @@ public ClientStream newStream(
519521
final Metadata headers,
520522
final Context context) {
521523
if (!retryEnabled) {
522-
ClientTransport transport =
523-
getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
524-
Context origContext = context.attach();
525524
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
526525
callOptions, headers, 0, /* isTransparentRetry= */ false);
526+
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
527+
method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
528+
Context origContext = context.attach();
527529
try {
528530
return transport.newStream(method, headers, callOptions, tracers);
529531
} finally {
@@ -566,8 +568,8 @@ ClientStream newSubstream(
566568
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
567569
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
568570
newOptions, newHeaders, previousAttempts, isTransparentRetry);
569-
ClientTransport transport =
570-
getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
571+
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
572+
method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
571573
Context origContext = context.attach();
572574
try {
573575
return transport.newStream(method, newHeaders, newOptions, tracers);
@@ -1207,7 +1209,8 @@ protected ClientCall<ReqT, RespT> delegate() {
12071209
@SuppressWarnings("unchecked")
12081210
@Override
12091211
public void start(Listener<RespT> observer, Metadata headers) {
1210-
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1212+
PickSubchannelArgs args =
1213+
new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
12111214
InternalConfigSelector.Result result = configSelector.selectConfig(args);
12121215
Status status = result.getStatus();
12131216
if (!status.isOk()) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.internal;
18+
19+
import com.google.common.base.Preconditions;
20+
import io.grpc.ClientStreamTracer;
21+
import io.grpc.LoadBalancer.PickDetailsConsumer;
22+
23+
/**
24+
* Adapter for tracers into details consumers.
25+
*/
26+
final class PickDetailsConsumerImpl implements PickDetailsConsumer {
27+
private final ClientStreamTracer[] tracers;
28+
29+
/** Construct a consumer with unchanging tracers array. */
30+
public PickDetailsConsumerImpl(ClientStreamTracer[] tracers) {
31+
this.tracers = Preconditions.checkNotNull(tracers, "tracers");
32+
}
33+
34+
@Override
35+
public void addOptionalLabel(String key, String value) {
36+
Preconditions.checkNotNull(key, "key");
37+
Preconditions.checkNotNull(value, "value");
38+
for (ClientStreamTracer tracer : tracers) {
39+
tracer.addOptionalLabel(key, value);
40+
}
41+
}
42+
}

core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.base.Objects;
2222
import io.grpc.CallOptions;
23+
import io.grpc.LoadBalancer.PickDetailsConsumer;
2324
import io.grpc.LoadBalancer.PickSubchannelArgs;
2425
import io.grpc.Metadata;
2526
import io.grpc.MethodDescriptor;
@@ -29,15 +30,18 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
2930
private final CallOptions callOptions;
3031
private final Metadata headers;
3132
private final MethodDescriptor<?, ?> method;
33+
private final PickDetailsConsumer pickDetailsConsumer;
3234

3335
/**
3436
* Creates call args object for given method with its call options, metadata.
3537
*/
3638
public PickSubchannelArgsImpl(
37-
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
39+
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
40+
PickDetailsConsumer pickDetailsConsumer) {
3841
this.method = checkNotNull(method, "method");
3942
this.headers = checkNotNull(headers, "headers");
4043
this.callOptions = checkNotNull(callOptions, "callOptions");
44+
this.pickDetailsConsumer = checkNotNull(pickDetailsConsumer, "pickDetailsConsumer");
4145
}
4246

4347
@Override
@@ -55,6 +59,11 @@ public CallOptions getCallOptions() {
5559
return method;
5660
}
5761

62+
@Override
63+
public PickDetailsConsumer getPickDetailsConsumer() {
64+
return pickDetailsConsumer;
65+
}
66+
5867
@Override
5968
public boolean equals(Object o) {
6069
if (this == o) {
@@ -66,12 +75,13 @@ public boolean equals(Object o) {
6675
PickSubchannelArgsImpl that = (PickSubchannelArgsImpl) o;
6776
return Objects.equal(callOptions, that.callOptions)
6877
&& Objects.equal(headers, that.headers)
69-
&& Objects.equal(method, that.method);
78+
&& Objects.equal(method, that.method)
79+
&& Objects.equal(pickDetailsConsumer, that.pickDetailsConsumer);
7080
}
7181

7282
@Override
7383
public int hashCode() {
74-
return Objects.hashCode(callOptions, headers, method);
84+
return Objects.hashCode(callOptions, headers, method, pickDetailsConsumer);
7585
}
7686

7787
@Override

0 commit comments

Comments
 (0)