Skip to content

Commit b4a90b2

Browse files
feat: Add methods to wait for consistency with a token (#2762)
* feat: Add methods to wait for consistency with a token This change adds support for checking consistency using a provided token, enabling distributed workflows. It also adds convenience methods to BigtableTableAdminClient for generating and waiting for a token automatically. This addresses the Consistency Tokens CUJ. Tracking Bug: b/475820272 * chore: generate libraries at Mon Jan 26 21:09:41 UTC 2026 * fix: address comments * fix: address feedback --------- Co-authored-by: cloud-java-bot <cloud-java-bot@google.com>
1 parent 49fe769 commit b4a90b2

6 files changed

Lines changed: 136 additions & 2 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,30 @@ public ApiFuture<Void> awaitReplicationAsync(final String tableId) {
14601460
return stub.awaitReplicationCallable().futureCall(tableName);
14611461
}
14621462

1463+
/**
1464+
* Polls an existing consistency token until table replication is consistent across all clusters.
1465+
* Useful for checking consistency of a token generated in a separate process. Blocks until
1466+
* completion.
1467+
*
1468+
* @param tableId The table to check.
1469+
* @param consistencyToken The token to poll.
1470+
*/
1471+
public void waitForConsistency(String tableId, String consistencyToken) {
1472+
ApiExceptions.callAndTranslateApiException(waitForConsistencyAsync(tableId, consistencyToken));
1473+
}
1474+
1475+
/**
1476+
* Asynchronously polls the consistency token. Returns a future that completes when table
1477+
* replication is consistent across all clusters.
1478+
*
1479+
* @param tableId The table to check.
1480+
* @param consistencyToken The token to poll.
1481+
*/
1482+
public ApiFuture<Void> waitForConsistencyAsync(String tableId, String consistencyToken) {
1483+
return stub.awaitConsistencyCallable()
1484+
.futureCall(ConsistencyRequest.forReplication(tableId, consistencyToken));
1485+
}
1486+
14631487
/**
14641488
* Creates a new authorized view with the specified configuration.
14651489
*

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import com.google.bigtable.admin.v2.StandardReadRemoteWrites;
2424
import com.google.bigtable.admin.v2.TableName;
2525
import com.google.cloud.bigtable.data.v2.internal.TableAdminRequestContext;
26+
import com.google.common.base.Preconditions;
2627
import javax.annotation.Nonnull;
28+
import javax.annotation.Nullable;
2729

2830
@AutoValue
2931
public abstract class ConsistencyRequest {
@@ -33,14 +35,36 @@ public abstract class ConsistencyRequest {
3335
@Nonnull
3436
protected abstract CheckConsistencyRequest.ModeCase getMode();
3537

38+
/**
39+
* Internal accessor for the consistency token. Must be public to be accessible from the stub
40+
* package.
41+
*/
42+
@InternalApi
43+
@Nullable
44+
public abstract String getConsistencyToken();
45+
3646
public static ConsistencyRequest forReplication(String tableId) {
3747
return new AutoValue_ConsistencyRequest(
38-
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
48+
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, null);
49+
}
50+
51+
/**
52+
* Creates a request to check consistency using an existing token.
53+
*
54+
* @param tableId The table ID.
55+
* @param consistencyToken The token to check. Must not be null.
56+
* @throws NullPointerException if consistencyToken is null.
57+
*/
58+
public static ConsistencyRequest forReplication(String tableId, String consistencyToken) {
59+
Preconditions.checkNotNull(consistencyToken, "consistencyToken must not be null");
60+
61+
return new AutoValue_ConsistencyRequest(
62+
tableId, CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES, consistencyToken);
3963
}
4064

4165
public static ConsistencyRequest forDataBoost(String tableId) {
4266
return new AutoValue_ConsistencyRequest(
43-
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES);
67+
tableId, CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES, null);
4468
}
4569

4670
@InternalApi

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallable.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,15 @@ static AwaitConsistencyCallable create(
9393
@Override
9494
public ApiFuture<Void> futureCall(
9595
final ConsistencyRequest consistencyRequest, final ApiCallContext apiCallContext) {
96+
97+
// If the token is already provided, skip generation and poll directly.
98+
if (consistencyRequest.getConsistencyToken() != null) {
99+
CheckConsistencyRequest request =
100+
consistencyRequest.toCheckConsistencyProto(
101+
requestContext, consistencyRequest.getConsistencyToken());
102+
return pollToken(request, apiCallContext);
103+
}
104+
96105
ApiFuture<GenerateConsistencyTokenResponse> tokenFuture =
97106
generateToken(consistencyRequest.toGenerateTokenProto(requestContext), apiCallContext);
98107

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,6 +1657,31 @@ public void testTestIamPermissions() {
16571657
assertThat(actualResult).containsExactly("bigtable.backups.get");
16581658
}
16591659

1660+
@Test
1661+
public void testWaitForConsistencyWithToken() {
1662+
// Setup
1663+
Mockito.when(mockStub.awaitConsistencyCallable()).thenReturn(mockAwaitConsistencyCallable);
1664+
1665+
String token = "my-token";
1666+
ConsistencyRequest expectedRequest = ConsistencyRequest.forReplication(TABLE_ID, token);
1667+
1668+
final AtomicBoolean wasCalled = new AtomicBoolean(false);
1669+
1670+
Mockito.when(mockAwaitConsistencyCallable.futureCall(expectedRequest))
1671+
.thenAnswer(
1672+
(Answer<ApiFuture<Void>>)
1673+
invocationOnMock -> {
1674+
wasCalled.set(true);
1675+
return ApiFutures.immediateFuture(null);
1676+
});
1677+
1678+
// Execute
1679+
adminClient.waitForConsistency(TABLE_ID, token);
1680+
1681+
// Verify
1682+
assertThat(wasCalled.get()).isTrue();
1683+
}
1684+
16601685
private <ReqT, RespT, MetaT> void mockOperationResult(
16611686
OperationCallable<ReqT, RespT, MetaT> callable,
16621687
ReqT request,

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyRequestTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,22 @@ public void testToGenerateTokenProto() {
7979
assertThat(generateRequest.getName())
8080
.isEqualTo(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID));
8181
}
82+
83+
@Test
84+
public void testToCheckConsistencyProtoWithToken() {
85+
ConsistencyRequest consistencyRequest =
86+
ConsistencyRequest.forReplication(TABLE_ID, CONSISTENCY_TOKEN);
87+
88+
TableAdminRequestContext requestContext =
89+
TableAdminRequestContext.create(PROJECT_ID, INSTANCE_ID);
90+
91+
CheckConsistencyRequest checkConsistencyRequest =
92+
consistencyRequest.toCheckConsistencyProto(requestContext, CONSISTENCY_TOKEN);
93+
94+
assertThat(checkConsistencyRequest.getName())
95+
.isEqualTo(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID));
96+
assertThat(checkConsistencyRequest.getConsistencyToken()).isEqualTo(CONSISTENCY_TOKEN);
97+
assertThat(checkConsistencyRequest.getModeCase())
98+
.isEqualTo(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES);
99+
}
82100
}

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitConsistencyCallableTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.Test;
4343
import org.junit.runner.RunWith;
4444
import org.junit.runners.JUnit4;
45+
import org.mockito.ArgumentMatchers;
4546
import org.mockito.Mock;
4647
import org.mockito.Mockito;
4748
import org.mockito.junit.MockitoJUnit;
@@ -325,4 +326,37 @@ public void testAwaitReplicationCallablePolling() throws Exception {
325326

326327
consistentFuture.get(1, TimeUnit.SECONDS);
327328
}
329+
330+
@Test
331+
public void testWithProvidedToken() throws Exception {
332+
// 1. Setup: Request with a pre-existing token
333+
String existingToken = "existing-token";
334+
ConsistencyRequest consistencyRequest =
335+
ConsistencyRequest.forReplication(TABLE_ID, existingToken);
336+
337+
// 2. Setup: Mock the check operation to succeed immediately
338+
CheckConsistencyRequest expectedCheckRequest =
339+
CheckConsistencyRequest.newBuilder()
340+
.setName(TABLE_NAME.toString())
341+
.setConsistencyToken(existingToken)
342+
.setStandardReadRemoteWrites(StandardReadRemoteWrites.newBuilder().build())
343+
.build();
344+
CheckConsistencyResponse expectedResponse =
345+
CheckConsistencyResponse.newBuilder().setConsistent(true).build();
346+
347+
Mockito.when(mockCheckConsistencyCallable.futureCall(expectedCheckRequest, CALL_CONTEXT))
348+
.thenReturn(ApiFutures.immediateFuture(expectedResponse));
349+
350+
// 3. Execute
351+
ApiFuture<Void> future = awaitConsistencyCallable.futureCall(consistencyRequest, CALL_CONTEXT);
352+
future.get(1, TimeUnit.SECONDS);
353+
354+
// 4. Verify: Generate was NEVER called, Check WAS called
355+
Mockito.verify(mockGenerateConsistencyTokenCallable, Mockito.never())
356+
.futureCall(
357+
ArgumentMatchers.any(GenerateConsistencyTokenRequest.class),
358+
ArgumentMatchers.any(ApiCallContext.class));
359+
Mockito.verify(mockCheckConsistencyCallable, Mockito.times(1))
360+
.futureCall(expectedCheckRequest, CALL_CONTEXT);
361+
}
328362
}

0 commit comments

Comments
 (0)