Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ public void run() {
LoadBalancingStrategy lbPolicy = lbPolicyRef.get();

for (BigtableChannelObserver info : channelInsights) {
TransportType transportType =
info.isAltsChannel()
? TransportType.TRANSPORT_TYPE_DIRECT_ACCESS
: TransportType.TRANSPORT_TYPE_CLOUD_PATH;
TransportType transportType = info.getTransportType();

long currentOutstandingUnaryRpcs = info.getOutstandingUnaryRpcs();
long currentOutstandingStreamingRpcs = info.getOutstandingStreamingRpcs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public static class SidebandData {
private static final CallOptions.Key<SidebandData> KEY =
CallOptions.Key.create("bigtable-sideband");

@Nullable
public static SidebandData from(CallOptions callOptions) {
return callOptions.getOption(KEY);
}

private static final Metadata.Key<String> SERVER_TIMING_HEADER_KEY =
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_HEADER_PATTERN =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.gaxx.grpc;

import com.google.api.core.InternalApi;
import com.google.bigtable.v2.PeerInfo;

/** Provides observability about a single channel in the channel pool. */
@InternalApi
Expand All @@ -32,5 +33,5 @@ public interface BigtableChannelObserver {
/** Get the current number of successful requests since the last observed period */
long getAndResetSuccessCount();

boolean isAltsChannel();
PeerInfo.TransportType getTransportType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.ChannelFactory;
import com.google.bigtable.v2.PeerInfo;
import com.google.cloud.bigtable.data.v2.stub.MetadataExtractorInterceptor;
import com.google.cloud.bigtable.gaxx.grpc.ChannelPoolHealthChecker.ProbeResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand All @@ -34,6 +36,7 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -543,9 +546,8 @@ static class Entry implements BigtableChannelObserver {
* outstanding RPCs has to happen when the ClientCall is closed or the ClientCall failed to
* start.
*/
@VisibleForTesting final AtomicReference<Boolean> isAltsHolder = new AtomicReference<>(null);

@VisibleForTesting final AtomicInteger errorCount = new AtomicInteger(0);

@VisibleForTesting final AtomicInteger successCount = new AtomicInteger(0);
@VisibleForTesting final AtomicInteger outstandingUnaryRpcs = new AtomicInteger(0);

Expand All @@ -554,6 +556,10 @@ static class Entry implements BigtableChannelObserver {
private final AtomicInteger maxOutstandingUnaryRpcs = new AtomicInteger();
private final AtomicInteger maxOutstandingStreamingRpcs = new AtomicInteger();

/** this contains the PeerInfo field of the most recent rpc on this channel entry. */
@VisibleForTesting
volatile PeerInfo.TransportType transportType = PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN;

/** Queue storing the last 5 minutes of probe results */
@VisibleForTesting
final ConcurrentLinkedQueue<ProbeResult> probeHistory = new ConcurrentLinkedQueue<>();
Expand All @@ -576,10 +582,18 @@ static class Entry implements BigtableChannelObserver {
this.channel = channel;
}

void checkAndSetIsAlts(ClientCall<?, ?> call) {
// TODO(populate ALTS holder)
boolean result = false;
isAltsHolder.compareAndSet(null, result);
void setTransportType(CallOptions callOptions) {
MetadataExtractorInterceptor.SidebandData sidebandData =
MetadataExtractorInterceptor.SidebandData.from(callOptions);

// Set to the specific transport type if present, otherwise default to UNKNOWN
// we could check the Status and set it to unknown, but we might have PeerInfo with some non
// OK Status
transportType =
Optional.ofNullable(sidebandData)
.map(MetadataExtractorInterceptor.SidebandData::getPeerInfo)
.map(PeerInfo::getTransportType)
.orElse(PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN);
}

ManagedChannel getManagedChannel() {
Expand Down Expand Up @@ -683,9 +697,8 @@ public long getAndResetSuccessCount() {
}

@Override
public boolean isAltsChannel() {
Boolean val = isAltsHolder.get();
return val != null && val;
public PeerInfo.TransportType getTransportType() {
return transportType;
}

void incrementErrorCount() {
Expand Down Expand Up @@ -717,7 +730,7 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
methodDescriptor.getType() == MethodDescriptor.MethodType.SERVER_STREAMING;
Entry entry = getRetainedEntry(index, isStreaming);
return new ReleasingClientCall<>(
entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming);
entry.channel.newCall(methodDescriptor, callOptions), entry, isStreaming, callOptions);
}
}

Expand All @@ -726,13 +739,19 @@ static class ReleasingClientCall<ReqT, RespT> extends SimpleForwardingClientCall
@Nullable private CancellationException cancellationException;
final Entry entry;
private final boolean isStreaming;
private final CallOptions callOptions;
private final AtomicBoolean wasClosed = new AtomicBoolean();
private final AtomicBoolean wasReleased = new AtomicBoolean();

public ReleasingClientCall(ClientCall<ReqT, RespT> delegate, Entry entry, boolean isStreaming) {
public ReleasingClientCall(
ClientCall<ReqT, RespT> delegate,
Entry entry,
boolean isStreaming,
CallOptions callOptions) {
super(delegate);
this.entry = entry;
this.isStreaming = isStreaming;
this.callOptions = callOptions;
}

@Override
Expand All @@ -741,10 +760,14 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
throw new IllegalStateException("Call is already cancelled", cancellationException);
}
try {
entry.checkAndSetIsAlts(delegate());

super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
super.onHeaders(headers);
entry.setTransportType(callOptions);
}

@Override
public void onClose(Status status, Metadata trailers) {
if (!wasClosed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;

import com.google.bigtable.v2.PeerInfo;
import com.google.cloud.bigtable.data.v2.internal.api.InstanceName;
import com.google.cloud.bigtable.data.v2.internal.csm.MetricRegistry;
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
Expand Down Expand Up @@ -56,6 +57,9 @@

@RunWith(JUnit4.class)
public class ChannelPoolMetricsTracerTest {
private static final String PROJECT_ID = "fake-project";
private static final String INSTANCE_ID = "fake-instance";
private static final String APP_PROFILE_ID = "fake-profile";

@Rule public final MockitoRule mockito = MockitoJUnit.rule();

Expand All @@ -74,8 +78,8 @@ public void setUp() {
metricReader = InMemoryMetricReader.create();
ClientInfo clientInfo =
ClientInfo.builder()
.setInstanceName(InstanceName.of("fake-project", "fake-instance"))
.setAppProfileId("fake-profile")
.setInstanceName(InstanceName.of(PROJECT_ID, INSTANCE_ID))
.setAppProfileId(APP_PROFILE_ID)
.build();
SdkMeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(metricReader).build();
Expand Down Expand Up @@ -105,8 +109,10 @@ public void setUp() {
when(mockInsight2.getOutstandingStreamingRpcs()).thenReturn(0);
when(mockInsight2.getAndResetErrorCount()).thenReturn(0L);
when(mockInsight2.getAndResetSuccessCount()).thenReturn(0L);
when(mockInsight1.isAltsChannel()).thenReturn(false);
when(mockInsight2.isAltsChannel()).thenReturn(false);
when(mockInsight1.getTransportType())
.thenReturn(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH);
when(mockInsight2.getTransportType())
.thenReturn(PeerInfo.TransportType.TRANSPORT_TYPE_CLOUD_PATH);
}

/** Helper to run the captured ChannelPoolMetricsTracer task. */
Expand All @@ -119,11 +125,24 @@ void runTrackerTask() {
}

private Attributes getExpectedErrorAttributes() {
return Attributes.builder().build();
return Attributes.builder()
.put(AttributeKey.stringKey("project_id"), PROJECT_ID)
.put(AttributeKey.stringKey("instance"), INSTANCE_ID)
.put(AttributeKey.stringKey("app_profile"), APP_PROFILE_ID)
.put(
AttributeKey.stringKey("client_name"),
"java-bigtable/" + com.google.cloud.bigtable.Version.VERSION)
.build();
}

private static Attributes getExpectedRpcAttributes(String lbPolicy, boolean streaming) {
return Attributes.builder()
.put(AttributeKey.stringKey("project_id"), PROJECT_ID)
.put(AttributeKey.stringKey("instance"), INSTANCE_ID)
.put(AttributeKey.stringKey("app_profile"), APP_PROFILE_ID)
.put(
AttributeKey.stringKey("client_name"),
"java-bigtable/" + com.google.cloud.bigtable.Version.VERSION)
.put(AttributeKey.stringKey("transport_type"), "cloudpath")
.put(AttributeKey.stringKey("lb_policy"), lbPolicy)
.put(AttributeKey.booleanKey("streaming"), streaming)
Expand All @@ -147,15 +166,6 @@ private static HistogramPointData getPointForStreaming(
() -> new AssertionError("Missing HistogramPointData for streaming=" + streaming));
}

/** Helper to create expected Attributes for assertions. */
private static Attributes getExpectedAttributes(String lbPolicy, boolean streaming) {
return Attributes.builder()
.put(AttributeKey.stringKey("transport_type"), "grpc")
.put(AttributeKey.stringKey("lb_policy"), lbPolicy)
.put(AttributeKey.booleanKey("streaming"), streaming)
.build();
}

@Test
public void testSingleRun() {
// Arrange
Expand Down
Loading