Skip to content

Commit eba0087

Browse files
committed
fix(bigtable): plumb transport type correctly
1 parent b923871 commit eba0087

5 files changed

Lines changed: 64 additions & 31 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,7 @@ public void run() {
8484
LoadBalancingStrategy lbPolicy = lbPolicyRef.get();
8585

8686
for (BigtableChannelObserver info : channelInsights) {
87-
TransportType transportType =
88-
info.isAltsChannel()
89-
? TransportType.TRANSPORT_TYPE_DIRECT_ACCESS
90-
: TransportType.TRANSPORT_TYPE_CLOUD_PATH;
87+
TransportType transportType = info.getTransportType();
9188

9289
long currentOutstandingUnaryRpcs = info.getOutstandingUnaryRpcs();
9390
long currentOutstandingStreamingRpcs = info.getOutstandingStreamingRpcs();

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public static class SidebandData {
8888
private static final CallOptions.Key<SidebandData> KEY =
8989
CallOptions.Key.create("bigtable-sideband");
9090

91+
@Nullable
92+
public static SidebandData from(CallOptions callOptions) {
93+
return callOptions == null ? null : callOptions.getOption(KEY);
94+
}
95+
9196
private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
9297
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
9398
private static final Pattern SERVER_TIMING_HEADER_PATTERN =

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.bigtable.gaxx.grpc;
1717

1818
import com.google.api.core.InternalApi;
19+
import com.google.bigtable.v2.PeerInfo;
1920

2021
/** Provides observability about a single channel in the channel pool. */
2122
@InternalApi
@@ -32,5 +33,5 @@ public interface BigtableChannelObserver {
3233
/** Get the current number of successful requests since the last observed period */
3334
long getAndResetSuccessCount();
3435

35-
boolean isAltsChannel();
36+
PeerInfo.TransportType getTransportType();
3637
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import com.google.api.core.InternalApi;
1919
import com.google.api.gax.grpc.ChannelFactory;
20+
import com.google.bigtable.v2.PeerInfo;
21+
import com.google.cloud.bigtable.data.v2.stub.MetadataExtractorInterceptor;
2022
import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult;
2123
import com.google.common.annotations.VisibleForTesting;
2224
import com.google.common.base.Preconditions;
@@ -543,7 +545,9 @@ static class Entry implements BigtableChannelObserver {
543545
* outstanding RPCs has to happen when the ClientCall is closed or the ClientCall failed to
544546
* start.
545547
*/
546-
@VisibleForTesting final AtomicReference<Boolean> isAltsHolder = new AtomicReference<>(null);
548+
@VisibleForTesting
549+
final AtomicReference<com.google.bigtable.v2.PeerInfo.TransportType> transportChannelHolder =
550+
new AtomicReference<>(com.google.bigtable.v2.PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN);
547551

548552
@VisibleForTesting final AtomicInteger errorCount = new AtomicInteger(0);
549553
@VisibleForTesting final AtomicInteger successCount = new AtomicInteger(0);
@@ -576,10 +580,22 @@ static class Entry implements BigtableChannelObserver {
576580
this.channel = channel;
577581
}
578582

579-
void checkAndSetIsAlts(ClientCall<?, ?> call) {
580-
// TODO(populate ALTS holder)
581-
boolean result = false;
582-
isAltsHolder.compareAndSet(null, result);
583+
void checkAndSetTransportType(Status status, CallOptions callOptions) {
584+
// set to UNKNOWN if error
585+
if (!status.isOk()) {
586+
transportChannelHolder.set(
587+
com.google.bigtable.v2.PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN);
588+
return;
589+
}
590+
MetadataExtractorInterceptor.SidebandData sidebandData =
591+
MetadataExtractorInterceptor.SidebandData.from(callOptions);
592+
593+
if (sidebandData != null) {
594+
com.google.bigtable.v2.PeerInfo peerInfo = sidebandData.getPeerInfo();
595+
if (peerInfo != null) {
596+
transportChannelHolder.set(peerInfo.getTransportType());
597+
}
598+
}
583599
}
584600

585601
ManagedChannel getManagedChannel() {
@@ -683,9 +699,8 @@ public long getAndResetSuccessCount() {
683699
}
684700

685701
@Override
686-
public boolean isAltsChannel() {
687-
Boolean val = isAltsHolder.get();
688-
return val != null && val;
702+
public PeerInfo.TransportType getTransportType() {
703+
return transportChannelHolder.get();
689704
}
690705

691706
void incrementErrorCount() {
@@ -717,7 +732,7 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
717732
methodDescriptor.getType() == MethodDescriptor.MethodType.SERVER_STREAMING;
718733
Entry entry = getRetainedEntry(index, isStreaming);
719734
return new ReleasingClientCall<>(
720-
entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming);
735+
entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming, callOptions);
721736
}
722737
}
723738

@@ -726,13 +741,19 @@ static class ReleasingClientCall<ReqT, RespT> extends SimpleForwardingClientCall
726741
@Nullable private CancellationException cancellationException;
727742
final Entry entry;
728743
private final boolean isStreaming;
744+
private final CallOptions callOptions;
729745
private final AtomicBoolean wasClosed = new AtomicBoolean();
730746
private final AtomicBoolean wasReleased = new AtomicBoolean();
731747

732-
public ReleasingClientCall(ClientCall<ReqT, RespT> delegate, Entry entry, boolean isStreaming) {
748+
public ReleasingClientCall(
749+
ClientCall<ReqT, RespT> delegate,
750+
Entry entry,
751+
boolean isStreaming,
752+
CallOptions callOptions) {
733753
super(delegate);
734754
this.entry = entry;
735755
this.isStreaming = isStreaming;
756+
this.callOptions = callOptions;
736757
}
737758

738759
@Override
@@ -741,12 +762,11 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
741762
throw new IllegalStateException("Call is already cancelled", cancellationException);
742763
}
743764
try {
744-
entry.checkAndSetIsAlts(delegate());
745-
746765
super.start(
747766
new SimpleForwardingClientCallListener<RespT>(responseListener) {
748767
@Override
749768
public void onClose(Status status, Metadata trailers) {
769+
entry.checkAndSetTransportType(status, callOptions);
750770
if (!wasClosed.compareAndSet(false, true)) {
751771
LOG.log(
752772
Level.WARNING,

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracerTest.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.ArgumentMatchers.anyLong;
2121
import static org.mockito.Mockito.when;
2222

23+
import com.google.bigtable.v2.PeerInfo;
2324
import com.google.cloud.bigtable.data.v2.internal.api.InstanceName;
2425
import com.google.cloud.bigtable.data.v2.internal.csm.MetricRegistry;
2526
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
@@ -56,6 +57,9 @@
5657

5758
@RunWith(JUnit4.class)
5859
public class ChannelPoolMetricsTracerTest {
60+
private static final String PROJECT_ID = "fake-project";
61+
private static final String INSTANCE_ID = "fake-instance";
62+
private static final String APP_PROFILE_ID = "fake-profile";
5963

6064
@Rule public final MockitoRule mockito = MockitoJUnit.rule();
6165

@@ -74,8 +78,8 @@ public void setUp() {
7478
metricReader = InMemoryMetricReader.create();
7579
ClientInfo clientInfo =
7680
ClientInfo.builder()
77-
.setInstanceName(InstanceName.of("fake-project", "fake-instance"))
78-
.setAppProfileId("fake-profile")
81+
.setInstanceName(InstanceName.of(PROJECT_ID, INSTANCE_ID))
82+
.setAppProfileId(APP_PROFILE_ID)
7983
.build();
8084
SdkMeterProvider meterProvider =
8185
SdkMeterProvider.builder().registerMetricReader(metricReader).build();
@@ -105,8 +109,10 @@ public void setUp() {
105109
when(mockInsight2.getOutstandingStreamingRpcs()).thenReturn(0);
106110
when(mockInsight2.getAndResetErrorCount()).thenReturn(0L);
107111
when(mockInsight2.getAndResetSuccessCount()).thenReturn(0L);
108-
when(mockInsight1.isAltsChannel()).thenReturn(false);
109-
when(mockInsight2.isAltsChannel()).thenReturn(false);
112+
when(mockInsight1.getTransportType())
113+
.thenReturn(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH);
114+
when(mockInsight2.getTransportType())
115+
.thenReturn(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH);
110116
}
111117

112118
/** Helper to run the captured ChannelPoolMetricsTracer task. */
@@ -119,11 +125,24 @@ void runTrackerTask() {
119125
}
120126

121127
private Attributes getExpectedErrorAttributes() {
122-
return Attributes.builder().build();
128+
return Attributes.builder()
129+
.put(AttributeKey.stringKey("project_id"), PROJECT_ID)
130+
.put(AttributeKey.stringKey("instance"), INSTANCE_ID)
131+
.put(AttributeKey.stringKey("app_profile"), APP_PROFILE_ID)
132+
.put(
133+
AttributeKey.stringKey("client_name"),
134+
"java-bigtable/" + com.google.cloud.bigtable.Version.VERSION)
135+
.build();
123136
}
124137

125138
private static Attributes getExpectedRpcAttributes(String lbPolicy, boolean streaming) {
126139
return Attributes.builder()
140+
.put(AttributeKey.stringKey("project_id"), PROJECT_ID)
141+
.put(AttributeKey.stringKey("instance"), INSTANCE_ID)
142+
.put(AttributeKey.stringKey("app_profile"), APP_PROFILE_ID)
143+
.put(
144+
AttributeKey.stringKey("client_name"),
145+
"java-bigtable/" + com.google.cloud.bigtable.Version.VERSION)
127146
.put(AttributeKey.stringKey("transport_type"), "cloudpath")
128147
.put(AttributeKey.stringKey("lb_policy"), lbPolicy)
129148
.put(AttributeKey.booleanKey("streaming"), streaming)
@@ -147,15 +166,6 @@ private static HistogramPointData getPointForStreaming(
147166
() -> new AssertionError("Missing HistogramPointData for streaming=" + streaming));
148167
}
149168

150-
/** Helper to create expected Attributes for assertions. */
151-
private static Attributes getExpectedAttributes(String lbPolicy, boolean streaming) {
152-
return Attributes.builder()
153-
.put(AttributeKey.stringKey("transport_type"), "grpc")
154-
.put(AttributeKey.stringKey("lb_policy"), lbPolicy)
155-
.put(AttributeKey.booleanKey("streaming"), streaming)
156-
.build();
157-
}
158-
159169
@Test
160170
public void testSingleRun() {
161171
// Arrange

0 commit comments

Comments
 (0)