Skip to content

feat(bigtable): route point read rows to shim#13542

Open
mutianf wants to merge 1 commit into
googleapis:mainfrom
mutianf:stream
Open

feat(bigtable): route point read rows to shim#13542
mutianf wants to merge 1 commit into
googleapis:mainfrom
mutianf:stream

Conversation

@mutianf

@mutianf mutianf commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

No description provided.

@mutianf mutianf requested review from a team as code owners June 23, 2026 02:57

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces MaybePointReadCallable to route single-point ReadRows queries through a unary point-read callable, allowing them to benefit from session-shim optimizations. It also adds helper methods in Query to identify single-point queries and updates EnhancedBigtableStub to integrate this new routing logic. The reviewer feedback highlights several critical improvements: addressing potential synchronous exceptions and cancellation handling in MaybePointReadCallable to comply with the ResponseObserver contract, and properly propagating custom retryableCodes when routing ReadRows calls to point reads.

Comment on lines +53 to +89
ApiFuture<RowT> future = pointReader.futureCall(request, context);
responseObserver.onStart(
new StreamController() {
@Override
public void cancel() {
future.cancel(false);
}

@Override
public void disableAutoInboundFlowControl() {}

@Override
public void request(int count) {}
});

ApiFutures.addCallback(
future,
new ApiFutureCallback<RowT>() {
@Override
public void onSuccess(RowT row) {
if (row != null) {
try {
responseObserver.onResponse(row);
} catch (Throwable t) {
responseObserver.onError(t);
return;
}
}
responseObserver.onComplete();
}

@Override
public void onFailure(Throwable t) {
responseObserver.onError(t);
}
},
MoreExecutors.directExecutor());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If pointReader.futureCall(request, context) throws a synchronous exception (for example, due to validation or precondition failures), responseObserver.onStart will never be called, and the exception will propagate synchronously to the caller. According to the ResponseObserver contract, onStart must be called before any other interaction, and errors should be propagated via onError rather than thrown synchronously.

Additionally, if the client cancels the stream, the callback's onFailure will be invoked with a CancellationException, which would propagate to responseObserver.onError. Since the client initiated the cancellation, we should suppress this error to avoid sending unexpected signals to the observer.

We can resolve both issues by calling onStart first, tracking the cancellation state, and safely catching any synchronous exceptions from futureCall.

    final java.util.concurrent.atomic.AtomicReference<ApiFuture<RowT>> futureRef =
        new java.util.concurrent.atomic.AtomicReference<>();
    final java.util.concurrent.atomic.AtomicBoolean cancelled =
        new java.util.concurrent.atomic.AtomicBoolean(false);

    responseObserver.onStart(
        new StreamController() {
          @Override
          public void cancel() {
            cancelled.set(true);
            ApiFuture<RowT> future = futureRef.get();
            if (future != null) {
              future.cancel(false);
            }
          }

          @Override
          public void disableAutoInboundFlowControl() {}

          @Override
          public void request(int count) {}
        });

    if (cancelled.get()) {
      return;
    }

    ApiFuture<RowT> future;
    try {
      future = pointReader.futureCall(request, context);
      futureRef.set(future);
    } catch (Throwable t) {
      responseObserver.onError(t);
      return;
    }

    if (cancelled.get()) {
      future.cancel(false);
      return;
    }

    ApiFutures.addCallback(
        future,
        new ApiFutureCallback<RowT>() {
          @Override
          public void onSuccess(RowT row) {
            if (cancelled.get()) {
              return;
            }
            if (row != null) {
              try {
                responseObserver.onResponse(row);
              } catch (Throwable t) {
                responseObserver.onError(t);
                return;
              }
            }
            responseObserver.onComplete();
          }

          @Override
          public void onFailure(Throwable t) {
            if (cancelled.get() || t instanceof java.util.concurrent.CancellationException) {
              return;
            }
            responseObserver.onError(t);
          }
        },
        MoreExecutors.directExecutor());
References
  1. Ensure that any SettableApiFuture or similar future created during RPC initialization is guaranteed to be completed (either successfully or exceptionally) even if the RPC fails to start or throws an exception before registration. Wrap the RPC start in a try-catch block to handle initialization failures and complete the future exceptionally.

Comment on lines +271 to +274
return new MaybePointReadCallable<>(
classic,
createPointReadCallable(
rowAdapter, "ReadRows", perOpSettings.readRowsSettings.getRetrySettings()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When routing a ReadRows call to a point read, we should respect the custom retryableCodes configured for readRowsSettings rather than falling back to the default readRowSettings codes. Let's pass the configured retryableCodes to createPointReadCallable.

    return new MaybePointReadCallable<>(
        classic,
        createPointReadCallable(
            rowAdapter,
            "ReadRows",
            perOpSettings.readRowsSettings.getRetryableCodes(),
            perOpSettings.readRowsSettings.getRetrySettings()));

Comment on lines 291 to +304
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
return createPointReadCallable(
rowAdapter, "ReadRow", perOpSettings.readRowSettings.getRetrySettings());
}

private <RowT> UnaryCallable<Query, RowT> createPointReadCallable(
RowAdapter<RowT> rowAdapter, String spanName, RetrySettings retrySettings) {
ClientContext clientContext = bigtableClientContext.getClientContext();

ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
.setRetryableCodes(perOpSettings.readRowSettings.getRetryableCodes())
.setRetrySettings(perOpSettings.readRowSettings.getRetrySettings())
.setRetrySettings(retrySettings)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update createPointReadCallable to accept retryableCodes as a parameter so that it can be customized depending on whether the call originated from ReadRow or ReadRows.

  public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
    return createPointReadCallable(
        rowAdapter,
        "ReadRow",
        perOpSettings.readRowSettings.getRetryableCodes(),
        perOpSettings.readRowSettings.getRetrySettings());
  }

  private <RowT> UnaryCallable<Query, RowT> createPointReadCallable(
      RowAdapter<RowT> rowAdapter,
      String spanName,
      java.util.Set<com.google.api.gax.rpc.StatusCode.Code> retryableCodes,
      RetrySettings retrySettings) {
    ClientContext clientContext = bigtableClientContext.getClientContext();

    ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
        createReadRowsBaseCallable(
            ServerStreamingCallSettings.<ReadRowsRequest, Row>newBuilder()
                .setRetryableCodes(retryableCodes)
                .setRetrySettings(retrySettings)

Comment on lines +324 to +325
UnaryCallSettings<?, ?> shimSettings =
perOpSettings.readRowSettings.toBuilder().setRetrySettings(retrySettings).build();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Ensure that the custom retryableCodes are also propagated to the shimSettings used by the session shim.

Suggested change
UnaryCallSettings<?, ?> shimSettings =
perOpSettings.readRowSettings.toBuilder().setRetrySettings(retrySettings).build();
UnaryCallSettings<?, ?> shimSettings =
perOpSettings.readRowSettings
.toBuilder()
.setRetryableCodes(retryableCodes)
.setRetrySettings(retrySettings)
.build();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant