Skip to content

Commit d25f39c

Browse files
fix(bigtable): plumb transport type correctly (#2824)
* fix(bigtable): plumb transport type correctly * fix * fix * fix * fix * fix * chore: generate libraries at Mon Mar 2 19:46:43 UTC 2026 * fix * fix --------- Co-authored-by: cloud-java-bot <cloud-java-bot@google.com>
1 parent 915733b commit d25f39c

5 files changed

Lines changed: 68 additions & 32 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,7 @@ public void run() {
8484
LoadBalancingStrategy lbPolicy = lbPolicyRef.get();
8585

8686
for (BigtableChannelObserver info : channelInsights) {
87-
TransportType transportType =
88-
info.isAltsChannel()
89-
? TransportType.TRANSPORT_TYPE_DIRECT_ACCESS
90-
: TransportType.TRANSPORT_TYPE_CLOUD_PATH;
87+
TransportType transportType = info.getTransportType();
9188

9289
long currentOutstandingUnaryRpcs = info.getOutstandingUnaryRpcs();
9390
long currentOutstandingStreamingRpcs = info.getOutstandingStreamingRpcs();

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public static class SidebandData {
8888
private static final CallOptions.Key<SidebandData> KEY =
8989
CallOptions.Key.create("bigtable-sideband");
9090

91+
@Nullable
92+
public static SidebandData from(CallOptions callOptions) {
93+
return callOptions.getOption(KEY);
94+
}
95+
9196
private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
9297
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
9398
private static final Pattern SERVER_TIMING_HEADER_PATTERN =

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.bigtable.gaxx.grpc;
1717

1818
import com.google.api.core.InternalApi;
19+
import com.google.bigtable.v2.PeerInfo;
1920

2021
/** Provides observability about a single channel in the channel pool. */
2122
@InternalApi
@@ -32,5 +33,5 @@ public interface BigtableChannelObserver {
3233
/** Get the current number of successful requests since the last observed period */
3334
long getAndResetSuccessCount();
3435

35-
boolean isAltsChannel();
36+
PeerInfo.TransportType getTransportType();
3637
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import com.google.api.core.InternalApi;
1919
import com.google.api.gax.grpc.ChannelFactory;
20+
import com.google.bigtable.v2.PeerInfo;
21+
import com.google.cloud.bigtable.data.v2.stub.MetadataExtractorInterceptor;
2022
import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult;
2123
import com.google.common.annotations.VisibleForTesting;
2224
import com.google.common.base.Preconditions;
@@ -34,6 +36,7 @@
3436
import java.time.Clock;
3537
import java.util.ArrayList;
3638
import java.util.List;
39+
import java.util.Optional;
3740
import java.util.Random;
3841
import java.util.concurrent.CancellationException;
3942
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -543,9 +546,8 @@ static class Entry implements BigtableChannelObserver {
543546
* outstanding RPCs has to happen when the ClientCall is closed or the ClientCall failed to
544547
* start.
545548
*/
546-
@VisibleForTesting final AtomicReference<Boolean> isAltsHolder = new AtomicReference<>(null);
547-
548549
@VisibleForTesting final AtomicInteger errorCount = new AtomicInteger(0);
550+
549551
@VisibleForTesting final AtomicInteger successCount = new AtomicInteger(0);
550552
@VisibleForTesting final AtomicInteger outstandingUnaryRpcs = new AtomicInteger(0);
551553

@@ -554,6 +556,10 @@ static class Entry implements BigtableChannelObserver {
554556
private final AtomicInteger maxOutstandingUnaryRpcs = new AtomicInteger();
555557
private final AtomicInteger maxOutstandingStreamingRpcs = new AtomicInteger();
556558

559+
/** this contains the PeerInfo field of the most recent rpc on this channel entry. */
560+
@VisibleForTesting
561+
volatile PeerInfo.TransportType transportType = PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN;
562+
557563
/** Queue storing the last 5 minutes of probe results */
558564
@VisibleForTesting
559565
final ConcurrentLinkedQueue<ProbeResult> probeHistory = new ConcurrentLinkedQueue<>();
@@ -576,10 +582,18 @@ static class Entry implements BigtableChannelObserver {
576582
this.channel = channel;
577583
}
578584

579-
void checkAndSetIsAlts(ClientCall<?, ?> call) {
580-
// TODO(populate ALTS holder)
581-
boolean result = false;
582-
isAltsHolder.compareAndSet(null, result);
585+
void setTransportType(CallOptions callOptions) {
586+
MetadataExtractorInterceptor.SidebandData sidebandData =
587+
MetadataExtractorInterceptor.SidebandData.from(callOptions);
588+
589+
// Set to the specific transport type if present, otherwise default to UNKNOWN
590+
// we could check the Status and set it to unknown, but we might have PeerInfo with some non
591+
// OK Status
592+
transportType =
593+
Optional.ofNullable(sidebandData)
594+
.map(MetadataExtractorInterceptor.SidebandData::getPeerInfo)
595+
.map(PeerInfo::getTransportType)
596+
.orElse(PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN);
583597
}
584598

585599
ManagedChannel getManagedChannel() {
@@ -683,9 +697,8 @@ public long getAndResetSuccessCount() {
683697
}
684698

685699
@Override
686-
public boolean isAltsChannel() {
687-
Boolean val = isAltsHolder.get();
688-
return val != null && val;
700+
public PeerInfo.TransportType getTransportType() {
701+
return transportType;
689702
}
690703

691704
void incrementErrorCount() {
@@ -717,7 +730,7 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
717730
methodDescriptor.getType() == MethodDescriptor.MethodType.SERVER_STREAMING;
718731
Entry entry = getRetainedEntry(index, isStreaming);
719732
return new ReleasingClientCall<>(
720-
entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming);
733+
entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming, callOptions);
721734
}
722735
}
723736

@@ -726,13 +739,19 @@ static class ReleasingClientCall<ReqT, RespT> extends SimpleForwardingClientCall
726739
@Nullable private CancellationException cancellationException;
727740
final Entry entry;
728741
private final boolean isStreaming;
742+
private final CallOptions callOptions;
729743
private final AtomicBoolean wasClosed = new AtomicBoolean();
730744
private final AtomicBoolean wasReleased = new AtomicBoolean();
731745

732-
public ReleasingClientCall(ClientCall<ReqT, RespT> delegate, Entry entry, boolean isStreaming) {
746+
public ReleasingClientCall(
747+
ClientCall<ReqT, RespT> delegate,
748+
Entry entry,
749+
boolean isStreaming,
750+
CallOptions callOptions) {
733751
super(delegate);
734752
this.entry = entry;
735753
this.isStreaming = isStreaming;
754+
this.callOptions = callOptions;
736755
}
737756

738757
@Override
@@ -741,10 +760,14 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
741760
throw new IllegalStateException("Call is already cancelled", cancellationException);
742761
}
743762
try {
744-
entry.checkAndSetIsAlts(delegate());
745-
746763
super.start(
747764
new SimpleForwardingClientCallListener<RespT>(responseListener) {
765+
@Override
766+
public void onHeaders(Metadata headers) {
767+
super.onHeaders(headers);
768+
entry.setTransportType(callOptions);
769+
}
770+
748771
@Override
749772
public void onClose(Status status, Metadata trailers) {
750773
if (!wasClosed.compareAndSet(false, true)) {

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracerTest.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.ArgumentMatchers.anyLong;
2121
import static org.mockito.Mockito.when;
2222

23+
import com.google.bigtable.v2.PeerInfo;
2324
import com.google.cloud.bigtable.data.v2.internal.api.InstanceName;
2425
import com.google.cloud.bigtable.data.v2.internal.csm.MetricRegistry;
2526
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
@@ -56,6 +57,9 @@
5657

5758
@RunWith(JUnit4.class)
5859
public class ChannelPoolMetricsTracerTest {
60+
private static final String PROJECT_ID = "fake-project";
61+
private static final String INSTANCE_ID = "fake-instance";
62+
private static final String APP_PROFILE_ID = "fake-profile";
5963

6064
@Rule public final MockitoRule mockito = MockitoJUnit.rule();
6165

@@ -74,8 +78,8 @@ public void setUp() {
7478
metricReader = InMemoryMetricReader.create();
7579
ClientInfo clientInfo =
7680
ClientInfo.builder()
77-
.setInstanceName(InstanceName.of("fake-project", "fake-instance"))
78-
.setAppProfileId("fake-profile")
81+
.setInstanceName(InstanceName.of(PROJECT_ID, INSTANCE_ID))
82+
.setAppProfileId(APP_PROFILE_ID)
7983
.build();
8084
SdkMeterProvider meterProvider =
8185
SdkMeterProvider.builder().registerMetricReader(metricReader).build();
@@ -105,8 +109,10 @@ public void setUp() {
105109
when(mockInsight2.getOutstandingStreamingRpcs()).thenReturn(0);
106110
when(mockInsight2.getAndResetErrorCount()).thenReturn(0L);
107111
when(mockInsight2.getAndResetSuccessCount()).thenReturn(0L);
108-
when(mockInsight1.isAltsChannel()).thenReturn(false);
109-
when(mockInsight2.isAltsChannel()).thenReturn(false);
112+
when(mockInsight1.getTransportType())
113+
.thenReturn(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH);
114+
when(mockInsight2.getTransportType())
115+
.thenReturn(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH);
110116
}
111117

112118
/** Helper to run the captured ChannelPoolMetricsTracer task. */
@@ -119,11 +125,24 @@ void runTrackerTask() {
119125
}
120126

121127
private Attributes getExpectedErrorAttributes() {
122-
return Attributes.builder().build();
128+
return Attributes.builder()
129+
.put(AttributeKey.stringKey("project_id"), PROJECT_ID)
130+
.put(AttributeKey.stringKey("instance"), INSTANCE_ID)
131+
.put(AttributeKey.stringKey("app_profile"), APP_PROFILE_ID)
132+
.put(
133+
AttributeKey.stringKey("client_name"),
134+
"java-bigtable/" + com.google.cloud.bigtable.Version.VERSION)
135+
.build();
123136
}
124137

125138
private static Attributes getExpectedRpcAttributes(String lbPolicy, boolean streaming) {
126139
return Attributes.builder()
140+
.put(AttributeKey.stringKey("project_id"), PROJECT_ID)
141+
.put(AttributeKey.stringKey("instance"), INSTANCE_ID)
142+
.put(AttributeKey.stringKey("app_profile"), APP_PROFILE_ID)
143+
.put(
144+
AttributeKey.stringKey("client_name"),
145+
"java-bigtable/" + com.google.cloud.bigtable.Version.VERSION)
127146
.put(AttributeKey.stringKey("transport_type"), "cloudpath")
128147
.put(AttributeKey.stringKey("lb_policy"), lbPolicy)
129148
.put(AttributeKey.booleanKey("streaming"), streaming)
@@ -147,15 +166,6 @@ private static HistogramPointData getPointForStreaming(
147166
() -> new AssertionError("Missing HistogramPointData for streaming=" + streaming));
148167
}
149168

150-
/** Helper to create expected Attributes for assertions. */
151-
private static Attributes getExpectedAttributes(String lbPolicy, boolean streaming) {
152-
return Attributes.builder()
153-
.put(AttributeKey.stringKey("transport_type"), "grpc")
154-
.put(AttributeKey.stringKey("lb_policy"), lbPolicy)
155-
.put(AttributeKey.booleanKey("streaming"), streaming)
156-
.build();
157-
}
158-
159169
@Test
160170
public void testSingleRun() {
161171
// Arrange

0 commit comments

Comments
 (0)