Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions java-datastore/datastore-v1-proto-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.DirectRetryingExecutor;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.common.truth.Truth;
import com.google.datastore.v1.Filter;
import com.google.datastore.v1.KindExpression;
Expand All @@ -27,9 +37,13 @@
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.rpc.Code;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -48,7 +62,7 @@ public void setUp() throws GeneralSecurityException, IOException {
}

@Test
public void testQuerySplitterWithDefaultDb() throws DatastoreException {
public void testQuerySplitterWithDefaultDb() throws Exception {
Filter propertyFilter =
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
Query query =
Expand All @@ -59,8 +73,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {

PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).build();

List<Query> splits =
DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
List<Query> splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
Truth.assertThat(splits).isNotEmpty();
splits.forEach(
split -> {
Expand All @@ -70,7 +83,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {
}

@Test
public void testQuerySplitterWithDb() throws DatastoreException {
public void testQuerySplitterWithDb() throws Exception {
Filter propertyFilter =
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
Query query =
Expand All @@ -81,8 +94,7 @@ public void testQuerySplitterWithDb() throws DatastoreException {

PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-db").build();

List<Query> splits =
DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
List<Query> splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);

Truth.assertThat(splits).isNotEmpty();
splits.forEach(
Expand All @@ -91,4 +103,85 @@ public void testQuerySplitterWithDb() throws DatastoreException {
Truth.assertThat(split.getFilter()).isEqualTo(propertyFilter);
});
}

/**
* A generic helper method that executes a {@link Callable} with retries using the GAX retrying
* framework.
*
* <p>It configures a {@link DirectRetryingExecutor} with the provided {@link RetrySettings} and
* the custom {@link ResultRetryAlgorithmWithContext}.
*
* @param callable the action to execute
* @param retrySettings the retry configuration (backoff, max attempts, timeouts)
* @param resultRetryAlgorithm the algorithm to determine if a failed attempt should be retried
* @return the result of the callable execution
* @throws Exception if the execution fails after all retry attempts.
*/
private static <V> V runWithRetry(
Callable<V> callable,
RetrySettings retrySettings,
ResultRetryAlgorithmWithContext<V> resultRetryAlgorithm)
throws Exception {
ApiClock clock = NanoClock.getDefaultClock();
// We must wrap the result algorithm and timed algorithm into a RetryAlgorithm
// as required by DirectRetryingExecutor.
RetryAlgorithm<V> retryAlgorithm =
new RetryAlgorithm<>(
resultRetryAlgorithm, new ExponentialRetryAlgorithm(retrySettings, clock));

DirectRetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);
RetryingFuture<V> future = executor.createFuture(callable);

ApiFuture<V> submittedFuture = executor.submit(future);

try {
return submittedFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// submittedFuture.get() wraps any exception thrown during execution in an ExecutionException.
// We unwrap and rethrow the actual cause (Exception or Error) directly so that test failures
// report the root cause (e.g., DatastoreException or AssertionError) instead of the wrapper.
if (cause instanceof Exception) {
throw (Exception) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw e;
} catch (InterruptedException e) {
// Restore the interrupted status before rethrowing, as per Java concurrency best practices.
Thread.currentThread().interrupt();
throw e;
}
}

// This low-level Datastore client (proto-over-HTTP) does not have built-in retry logic
// (unlike the high-level google-cloud-datastore gRPC client). We must explicitly retry
// here to handle transient backend errors (such as Code.INTERNAL auth issues).
// We reuse GAX retrying utilities here in the test to implement this backoff/retry.
private static List<Query> getSplitsWithRetry(
Query query, PartitionId partition, int numSplits, Datastore datastore) throws Exception {
// Fail fast configuration to avoid long wait times during test failures
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setMaxAttempts(3)
.setInitialRetryDelayDuration(Duration.ofMillis(200))
.setRetryDelayMultiplier(1.5)
.setMaxRetryDelayDuration(Duration.ofMillis(500))
.setTotalTimeoutDuration(Duration.ofSeconds(2))
.build();
return runWithRetry(
() -> DatastoreHelper.getQuerySplitter().getSplits(query, partition, numSplits, datastore),
retrySettings,
new BasicResultRetryAlgorithm<List<Query>>() {
@Override
public boolean shouldRetry(Throwable prevThrowable, List<Query> prevResult) {
if (prevThrowable instanceof DatastoreException) {
DatastoreException de = (DatastoreException) prevThrowable;
return de.getCode() == Code.INTERNAL;
}
return false;
}
});
}
}
5 changes: 5 additions & 0 deletions java-datastore/google-cloud-datastore-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,28 @@
import static com.google.datastore.utils.DatastoreHelper.makeFilter;
import static com.google.datastore.utils.DatastoreHelper.makeValue;

import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
import com.google.api.gax.retrying.DirectRetryingExecutor;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.common.truth.Truth;
import com.google.datastore.utils.Datastore;
import com.google.datastore.utils.DatastoreException;
import com.google.datastore.utils.DatastoreHelper;
import com.google.datastore.v1.*;
import com.google.rpc.Code;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -44,7 +58,7 @@ public void setUp() throws GeneralSecurityException, IOException {
}

@Test
public void testQuerySplitterWithDefaultDb() throws DatastoreException {
public void testQuerySplitterWithDefaultDb() throws Exception {
Filter propertyFilter =
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
Query query =
Expand All @@ -55,8 +69,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {

PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).build();

List<Query> splits =
DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
List<Query> splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
Truth.assertThat(splits).isNotEmpty();
splits.forEach(
split -> {
Expand All @@ -66,7 +79,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {
}

@Test
public void testQuerySplitterWithDb() throws DatastoreException {
public void testQuerySplitterWithDb() throws Exception {
Filter propertyFilter =
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
Query query =
Expand All @@ -77,8 +90,7 @@ public void testQuerySplitterWithDb() throws DatastoreException {

PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-db").build();

List<Query> splits =
DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
List<Query> splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);

Truth.assertThat(splits).isNotEmpty();
splits.forEach(
Expand All @@ -87,4 +99,85 @@ public void testQuerySplitterWithDb() throws DatastoreException {
Truth.assertThat(split.getFilter()).isEqualTo(propertyFilter);
});
}

/**
* A generic helper method that executes a {@link Callable} with retries using the GAX retrying
* framework.
*
* <p>It configures a {@link DirectRetryingExecutor} with the provided {@link RetrySettings} and
* the custom {@link ResultRetryAlgorithmWithContext}.
*
* @param callable the action to execute
* @param retrySettings the retry configuration (backoff, max attempts, timeouts)
* @param resultRetryAlgorithm the algorithm to determine if a failed attempt should be retried
* @return the result of the callable execution
* @throws Exception if the execution fails after all retry attempts.
*/
private static <V> V runWithRetry(
Callable<V> callable,
RetrySettings retrySettings,
ResultRetryAlgorithmWithContext<V> resultRetryAlgorithm)
throws Exception {
ApiClock clock = NanoClock.getDefaultClock();
// We must wrap the result algorithm and timed algorithm into a RetryAlgorithm
// as required by DirectRetryingExecutor.
RetryAlgorithm<V> retryAlgorithm =
new RetryAlgorithm<>(
resultRetryAlgorithm, new ExponentialRetryAlgorithm(retrySettings, clock));

DirectRetryingExecutor<V> executor = new DirectRetryingExecutor<>(retryAlgorithm);
RetryingFuture<V> future = executor.createFuture(callable);

ApiFuture<V> submittedFuture = executor.submit(future);

try {
return submittedFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// submittedFuture.get() wraps any exception thrown during execution in an ExecutionException.
// We unwrap and rethrow the actual cause (Exception or Error) directly so that test failures
// report the root cause (e.g., DatastoreException or AssertionError) instead of the wrapper.
if (cause instanceof Exception) {
throw (Exception) cause;
}
if (cause instanceof Error) {
throw (Error) cause;
}
throw e;
} catch (InterruptedException e) {
// Restore the interrupted status before rethrowing, as per Java concurrency best practices.
Thread.currentThread().interrupt();
throw e;
}
}

// This low-level Datastore client (proto-over-HTTP) does not have built-in retry logic
// (unlike the high-level google-cloud-datastore gRPC client). We must explicitly retry
// here to handle transient backend errors (such as Code.INTERNAL auth issues).
// We reuse GAX retrying utilities here in the test to implement this backoff/retry.
private static List<Query> getSplitsWithRetry(
Query query, PartitionId partition, int numSplits, Datastore datastore) throws Exception {
// Fail fast configuration to avoid long wait times during test failures
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setMaxAttempts(3)
.setInitialRetryDelayDuration(Duration.ofMillis(200))
.setRetryDelayMultiplier(1.5)
.setMaxRetryDelayDuration(Duration.ofMillis(500))
.setTotalTimeoutDuration(Duration.ofSeconds(2))
.build();
return runWithRetry(
() -> DatastoreHelper.getQuerySplitter().getSplits(query, partition, numSplits, datastore),
retrySettings,
new BasicResultRetryAlgorithm<List<Query>>() {
@Override
public boolean shouldRetry(Throwable prevThrowable, List<Query> prevResult) {
if (prevThrowable instanceof DatastoreException) {
DatastoreException de = (DatastoreException) prevThrowable;
return de.getCode() == Code.INTERNAL;
}
return false;
}
});
}
}
Loading