Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,8 @@
<bigtable.data-endpoint>${bigtable.directpath-data-endpoint}</bigtable.data-endpoint>
<bigtable.admin-endpoint>${bigtable.directpath-admin-endpoint}</bigtable.admin-endpoint>
<bigtable.grpc-log-dir>${project.build.directory}/test-grpc-logs/directpath-it</bigtable.grpc-log-dir>
<bigtable.attempt-directpath>true</bigtable.attempt-directpath>
</systemPropertyVariables>
<!-- Enable directpath for bigtable -->
<environmentVariables>
<GOOGLE_CLOUD_ENABLE_DIRECT_PATH>bigtable</GOOGLE_CLOUD_ENABLE_DIRECT_PATH>
Comment thread
kolea2 marked this conversation as resolved.
</environmentVariables>
<includes>
<!-- TODO(igorbernstein): Once the control plane is accessible via directpath, add admin tests -->
<include>com.google.cloud.bigtable.data.v2.it.*IT</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private static final int MAX_MESSAGE_SIZE = 256 * 1024 * 1024;
private static final String SERVER_DEFAULT_APP_PROFILE_ID = "";

// TODO(igorbernstein2): Remove this once DirectPath goes to public beta
// Temporary endpoint for the DirectPath private alpha
private static final String DIRECT_PATH_ENV_VAR = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH";
private static final String DIRECT_PATH_ENDPOINT = "directpath-bigtable.googleapis.com:443";
// TODO(weiranf): Remove this temporary endpoint once DirectPath goes to public beta
private static final String DIRECT_PATH_ENDPOINT =
"testdirectpath-bigtable.sandbox.googleapis.com:443";

private static final Set<Code> IDEMPOTENT_RETRY_CODES =
ImmutableSet.of(Code.DEADLINE_EXCEEDED, Code.UNAVAILABLE);
Expand Down Expand Up @@ -231,7 +230,14 @@ public boolean isRefreshingChannel() {
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
.setPoolSize(getDefaultChannelPoolSize())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE);
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
// TODO(weiranf): Set this to true by default once DirectPath goes to public beta
.setAttemptDirectPath(isDirectPathEnabled());
}

// TODO(weiranf): Remove this once DirectPath goes to public beta
private static boolean isDirectPathEnabled() {
return Boolean.getBoolean("bigtable.attempt-directpath");
Comment thread
WeiranFang marked this conversation as resolved.
}

static int getDefaultChannelPoolSize() {
Expand Down Expand Up @@ -504,7 +510,7 @@ private Builder() {
// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();

// TODO(igorbernstein): remove this once DirectPath goes to public Beta and uses the default
// TODO(weiranf): remove this once DirectPath goes to public Beta and uses the default
// endpoint.
if (isDirectPathEnabled()) {
setEndpoint(DIRECT_PATH_ENDPOINT);
Expand Down Expand Up @@ -626,22 +632,6 @@ private static void copyRetrySettings(
dest.setRetryableCodes(source.getRetryableCodes());
dest.setRetrySettings(source.getRetrySettings());
}

// TODO(igorbernstein): Remove this once DirectPath goes to public beta
// Extracted from InstantiatingGrpcChannelProvider#isDirectPathEnabled
private static boolean isDirectPathEnabled() {
String whiteList = System.getenv(DIRECT_PATH_ENV_VAR);
if (whiteList == null) {
return false;
}

for (String service : whiteList.split(",")) {
if (!service.isEmpty() && DIRECT_PATH_ENDPOINT.contains(service)) {
return true;
}
}
return false;
}
// </editor-fold>

// <editor-fold desc="Public API">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import io.grpc.netty.shaded.io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -67,11 +67,15 @@ public class DirectPathFallbackIT {
private static final int MIN_COMPLETE_READ_CALLS = 40;
private static final int NUM_RPCS_TO_SEND = 20;

// IP address prefixes allocated for DirectPath backends.
private static final String DP_IPV6_PREFIX = "2001:4860:8040";
Comment thread
WeiranFang marked this conversation as resolved.
private static final String DP_IPV4_PREFIX = "34.126";

@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();

private AtomicBoolean blackholeIPv6 = new AtomicBoolean();
private AtomicBoolean blackholeDpAddr = new AtomicBoolean();
private AtomicInteger numBlocked = new AtomicInteger();
private AtomicInteger numIPv6Read = new AtomicInteger();
private AtomicInteger numDpAddrRead = new AtomicInteger();

private ChannelFactory<NioSocketChannel> channelFactory;
private EventLoopGroup eventLoopGroup;
Expand All @@ -97,6 +101,7 @@ public void setup() throws IOException {
InstantiatingGrpcChannelProvider instrumentedTransportChannelProvider =
defaultTransportProvider
.toBuilder()
.setAttemptDirectPath(true)
.setPoolSize(1)
.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
Expand Down Expand Up @@ -140,8 +145,8 @@ public void testFallback() throws InterruptedException, TimeoutException {
// Precondition: wait for DirectPath to connect
assertWithMessage("Failed to observe RPCs over DirectPath").that(exerciseDirectPath()).isTrue();

// Enable the blackhole, which will prevent communication via IPv6 and thus DirectPath.
blackholeIPv6.set(true);
// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeDpAddr.set(true);

// Send a request, which should be routed over IPv4 and CFE.
instrumentedClient.readRow(testEnvRule.env().getTableId(), "nonexistent-row");
Expand All @@ -154,14 +159,14 @@ public void testFallback() throws InterruptedException, TimeoutException {

// Make sure that the client will start reading from IPv6 again by sending new requests and
// checking the injected IPv6 counter has been updated.
blackholeIPv6.set(false);
blackholeDpAddr.set(false);

assertWithMessage("Failed to upgrade back to DirectPath").that(exerciseDirectPath()).isTrue();
}

private boolean exerciseDirectPath() throws InterruptedException, TimeoutException {
Stopwatch stopwatch = Stopwatch.createStarted();
numIPv6Read.set(0);
numDpAddrRead.set(0);

boolean seenEnough = false;

Expand All @@ -170,7 +175,7 @@ private boolean exerciseDirectPath() throws InterruptedException, TimeoutExcepti
instrumentedClient.readRow(testEnvRule.env().getTableId(), "nonexistent-row");
}
Thread.sleep(100);
seenEnough = numIPv6Read.get() >= MIN_COMPLETE_READ_CALLS;
seenEnough = numDpAddrRead.get() >= MIN_COMPLETE_READ_CALLS;
}
return seenEnough;
}
Expand Down Expand Up @@ -215,7 +220,7 @@ public NioSocketChannel newChannel() {
* make IPv6 packets disappear
*/
private class MyChannelHandler extends ChannelDuplexHandler {
private boolean isIPv6;
private boolean isDpAddr;

@Override
public void connect(
Expand All @@ -225,11 +230,13 @@ public void connect(
ChannelPromise promise)
throws Exception {

this.isIPv6 =
(remoteAddress instanceof InetSocketAddress)
&& ((InetSocketAddress) remoteAddress).getAddress() instanceof Inet6Address;
if (remoteAddress instanceof InetSocketAddress) {
InetAddress inetAddress = ((InetSocketAddress) remoteAddress).getAddress();
String addr = inetAddress.getHostAddress();
isDpAddr = addr.startsWith(DP_IPV6_PREFIX) || addr.startsWith(DP_IPV4_PREFIX);
}

if (!(isIPv6 && blackholeIPv6.get())) {
if (!(isDpAddr && blackholeDpAddr.get())) {
super.connect(ctx, remoteAddress, localAddress, promise);
} else {
// Fail the connection fast
Expand All @@ -239,7 +246,7 @@ public void connect(

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean dropCall = isIPv6 && blackholeIPv6.get();
boolean dropCall = isDpAddr && blackholeDpAddr.get();

if (dropCall) {
// Don't notify the next handler and increment counter
Expand All @@ -252,14 +259,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
boolean dropCall = isIPv6 && blackholeIPv6.get();
boolean dropCall = isDpAddr && blackholeDpAddr.get();

if (dropCall) {
// Don't notify the next handler and increment counter
numBlocked.incrementAndGet();
} else {
if (isIPv6) {
numIPv6Read.incrementAndGet();
if (isDpAddr) {
numDpAddrRead.incrementAndGet();
}
super.channelReadComplete(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public boolean isInstanceAdminSupported() {
}

public boolean isDirectPathEnabled() {
return "bigtable".equals(System.getenv("GOOGLE_CLOUD_ENABLE_DIRECT_PATH"));
return Boolean.getBoolean("bigtable.attempt-directpath");
}

public String getPrimaryZone() {
Expand Down