diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracer.java index 0eb9242b77..ec132e4dad 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracer.java @@ -84,10 +84,7 @@ public void run() { LoadBalancingStrategy lbPolicy = lbPolicyRef.get(); for (BigtableChannelObserver info : channelInsights) { - TransportType transportType = - info.isAltsChannel() - ? TransportType.TRANSPORT_TYPE_DIRECT_ACCESS - : TransportType.TRANSPORT_TYPE_CLOUD_PATH; + TransportType transportType = info.getTransportType(); long currentOutstandingUnaryRpcs = info.getOutstandingUnaryRpcs(); long currentOutstandingStreamingRpcs = info.getOutstandingStreamingRpcs(); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java index 14ad73131f..4912ddcc36 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/MetadataExtractorInterceptor.java @@ -35,12 +35,15 @@ import io.grpc.alts.AltsContextUtil; import java.time.Duration; import java.util.Base64; +import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; @InternalApi public class MetadataExtractorInterceptor implements ClientInterceptor { + static final Logger LOG = Logger.getLogger(MetadataExtractorInterceptor.class.getName()); + private final SidebandData sidebandData = new SidebandData(); public GrpcCallContext injectInto(GrpcCallContext ctx) { @@ -88,6 +91,11 @@ public static class SidebandData { private static final CallOptions.Key KEY = CallOptions.Key.create("bigtable-sideband"); + @Nullable + public static SidebandData from(CallOptions callOptions) { + return callOptions.getOption(KEY); + } + private static final Metadata.Key SERVER_TIMING_HEADER_KEY = Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER); private static final Pattern SERVER_TIMING_HEADER_PATTERN = @@ -123,9 +131,14 @@ private void reset() { } void onResponseHeaders(Metadata md, Attributes attributes) { + LOG.info("Raw Response Metadata: " + md); + peerInfo = extractPeerInfo(md, gfeTiming, attributes); + // 3. Log the final resolved PeerInfo responseParams = extractResponseParams(md); gfeTiming = extractGfeLatency(md); + LOG.info("Extracted GFE Timing (server-timing): " + gfeTiming); peerInfo = extractPeerInfo(md, gfeTiming, attributes); + LOG.info("Final Resolved PeerInfo: " + peerInfo); } void onClose(Status status, Metadata trailers) { @@ -152,7 +165,9 @@ private static Duration extractGfeLatency(Metadata metadata) { private static PeerInfo extractPeerInfo( Metadata metadata, Duration gfeTiming, Attributes attributes) { String encodedStr = metadata.get(PEER_INFO_KEY); + LOG.info("Raw bigtable-peer-info header value: " + encodedStr); if (Strings.isNullOrEmpty(encodedStr)) { + LOG.info("bigtable-peer-info header was missing or empty. Returning null."); return null; } @@ -160,14 +175,18 @@ private static PeerInfo extractPeerInfo( byte[] decoded = Base64.getUrlDecoder().decode(encodedStr); PeerInfo peerInfo = PeerInfo.parseFrom(decoded); PeerInfo.TransportType effectiveTransport = peerInfo.getTransportType(); + LOG.info("Parsed TransportType straight from protobuf: " + effectiveTransport); // TODO: remove this once transport_type is being sent by the server // This is a temporary workaround to detect directpath until its available from the server if (effectiveTransport == PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN) { boolean isAlts = AltsContextUtil.check(attributes); + LOG.info("TransportType was UNKNOWN. Fallback checking ALTS context: isAlts=" + isAlts); if (isAlts) { + LOG.info("Fallback applied: Set to DIRECT_ACCESS based on ALTS."); effectiveTransport = PeerInfo.TransportType.TRANSPORT_TYPE_DIRECT_ACCESS; } else if (gfeTiming != null) { + LOG.info("Fallback applied: Set to CLOUD_PATH based on presence of gfeTiming."); effectiveTransport = PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH; } } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java index a718f5fa06..2230e14bf2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelObserver.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.gaxx.grpc; import com.google.api.core.InternalApi; +import com.google.bigtable.v2.PeerInfo; /** Provides observability about a single channel in the channel pool. */ @InternalApi @@ -32,5 +33,5 @@ public interface BigtableChannelObserver { /** Get the current number of successful requests since the last observed period */ long getAndResetSuccessCount(); - boolean isAltsChannel(); + PeerInfo.TransportType getTransportType(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java index f5f1928c2a..6bbfba1398 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/gaxx/grpc/BigtableChannelPool.java @@ -17,6 +17,8 @@ import com.google.api.core.InternalApi; import com.google.api.gax.grpc.ChannelFactory; +import com.google.bigtable.v2.PeerInfo; +import com.google.cloud.bigtable.data.v2.stub.MetadataExtractorInterceptor; import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -34,6 +36,7 @@ import java.time.Clock; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentLinkedQueue; @@ -543,9 +546,8 @@ static class Entry implements BigtableChannelObserver { * outstanding RPCs has to happen when the ClientCall is closed or the ClientCall failed to * start. */ - @VisibleForTesting final AtomicReference isAltsHolder = new AtomicReference<>(null); - @VisibleForTesting final AtomicInteger errorCount = new AtomicInteger(0); + @VisibleForTesting final AtomicInteger successCount = new AtomicInteger(0); @VisibleForTesting final AtomicInteger outstandingUnaryRpcs = new AtomicInteger(0); @@ -554,6 +556,10 @@ static class Entry implements BigtableChannelObserver { private final AtomicInteger maxOutstandingUnaryRpcs = new AtomicInteger(); private final AtomicInteger maxOutstandingStreamingRpcs = new AtomicInteger(); + /** this contains the PeerInfo field of the most recent rpc on this channel entry. */ + @VisibleForTesting + volatile PeerInfo.TransportType transportType = PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN; + /** Queue storing the last 5 minutes of probe results */ @VisibleForTesting final ConcurrentLinkedQueue probeHistory = new ConcurrentLinkedQueue<>(); @@ -576,10 +582,18 @@ static class Entry implements BigtableChannelObserver { this.channel = channel; } - void checkAndSetIsAlts(ClientCall call) { - // TODO(populate ALTS holder) - boolean result = false; - isAltsHolder.compareAndSet(null, result); + void setTransportType(CallOptions callOptions) { + MetadataExtractorInterceptor.SidebandData sidebandData = + MetadataExtractorInterceptor.SidebandData.from(callOptions); + + // Set to the specific transport type if present, otherwise default to UNKNOWN + // we could check the Status and set it to unknown, but we might have PeerInfo with some non + // OK Status + transportType = + Optional.ofNullable(sidebandData) + .map(MetadataExtractorInterceptor.SidebandData::getPeerInfo) + .map(PeerInfo::getTransportType) + .orElse(PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN); } ManagedChannel getManagedChannel() { @@ -683,9 +697,8 @@ public long getAndResetSuccessCount() { } @Override - public boolean isAltsChannel() { - Boolean val = isAltsHolder.get(); - return val != null && val; + public PeerInfo.TransportType getTransportType() { + return transportType; } void incrementErrorCount() { @@ -717,7 +730,7 @@ public ClientCall newCall( methodDescriptor.getType() == MethodDescriptor.MethodType.SERVER_STREAMING; Entry entry = getRetainedEntry(index, isStreaming); return new ReleasingClientCall<>( - entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming); + entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming, callOptions); } } @@ -726,13 +739,19 @@ static class ReleasingClientCall extends SimpleForwardingClientCall @Nullable private CancellationException cancellationException; final Entry entry; private final boolean isStreaming; + private final CallOptions callOptions; private final AtomicBoolean wasClosed = new AtomicBoolean(); private final AtomicBoolean wasReleased = new AtomicBoolean(); - public ReleasingClientCall(ClientCall delegate, Entry entry, boolean isStreaming) { + public ReleasingClientCall( + ClientCall delegate, + Entry entry, + boolean isStreaming, + CallOptions callOptions) { super(delegate); this.entry = entry; this.isStreaming = isStreaming; + this.callOptions = callOptions; } @Override @@ -741,10 +760,14 @@ public void start(Listener responseListener, Metadata headers) { throw new IllegalStateException("Call is already cancelled", cancellationException); } try { - entry.checkAndSetIsAlts(delegate()); - super.start( new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onHeaders(Metadata headers) { + super.onHeaders(headers); + entry.setTransportType(callOptions); + } + @Override public void onClose(Status status, Metadata trailers) { if (!wasClosed.compareAndSet(false, true)) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracerTest.java index fec4f7956a..fca44b08de 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/ChannelPoolMetricsTracerTest.java @@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.when; +import com.google.bigtable.v2.PeerInfo; import com.google.cloud.bigtable.data.v2.internal.api.InstanceName; import com.google.cloud.bigtable.data.v2.internal.csm.MetricRegistry; import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo; @@ -56,6 +57,9 @@ @RunWith(JUnit4.class) public class ChannelPoolMetricsTracerTest { + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String APP_PROFILE_ID = "fake-profile"; @Rule public final MockitoRule mockito = MockitoJUnit.rule(); @@ -74,8 +78,8 @@ public void setUp() { metricReader = InMemoryMetricReader.create(); ClientInfo clientInfo = ClientInfo.builder() - .setInstanceName(InstanceName.of("fake-project", "fake-instance")) - .setAppProfileId("fake-profile") + .setInstanceName(InstanceName.of(PROJECT_ID, INSTANCE_ID)) + .setAppProfileId(APP_PROFILE_ID) .build(); SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(metricReader).build(); @@ -105,8 +109,10 @@ public void setUp() { when(mockInsight2.getOutstandingStreamingRpcs()).thenReturn(0); when(mockInsight2.getAndResetErrorCount()).thenReturn(0L); when(mockInsight2.getAndResetSuccessCount()).thenReturn(0L); - when(mockInsight1.isAltsChannel()).thenReturn(false); - when(mockInsight2.isAltsChannel()).thenReturn(false); + when(mockInsight1.getTransportType()) + .thenReturn(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH); + when(mockInsight2.getTransportType()) + .thenReturn(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH); } /** Helper to run the captured ChannelPoolMetricsTracer task. */ @@ -119,11 +125,24 @@ void runTrackerTask() { } private Attributes getExpectedErrorAttributes() { - return Attributes.builder().build(); + return Attributes.builder() + .put(AttributeKey.stringKey("project_id"), PROJECT_ID) + .put(AttributeKey.stringKey("instance"), INSTANCE_ID) + .put(AttributeKey.stringKey("app_profile"), APP_PROFILE_ID) + .put( + AttributeKey.stringKey("client_name"), + "java-bigtable/" + com.google.cloud.bigtable.Version.VERSION) + .build(); } private static Attributes getExpectedRpcAttributes(String lbPolicy, boolean streaming) { return Attributes.builder() + .put(AttributeKey.stringKey("project_id"), PROJECT_ID) + .put(AttributeKey.stringKey("instance"), INSTANCE_ID) + .put(AttributeKey.stringKey("app_profile"), APP_PROFILE_ID) + .put( + AttributeKey.stringKey("client_name"), + "java-bigtable/" + com.google.cloud.bigtable.Version.VERSION) .put(AttributeKey.stringKey("transport_type"), "cloudpath") .put(AttributeKey.stringKey("lb_policy"), lbPolicy) .put(AttributeKey.booleanKey("streaming"), streaming) @@ -147,15 +166,6 @@ private static HistogramPointData getPointForStreaming( () -> new AssertionError("Missing HistogramPointData for streaming=" + streaming)); } - /** Helper to create expected Attributes for assertions. */ - private static Attributes getExpectedAttributes(String lbPolicy, boolean streaming) { - return Attributes.builder() - .put(AttributeKey.stringKey("transport_type"), "grpc") - .put(AttributeKey.stringKey("lb_policy"), lbPolicy) - .put(AttributeKey.booleanKey("streaming"), streaming) - .build(); - } - @Test public void testSingleRun() { // Arrange