Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
adb81f2
chore: rationalize SessionImpl field visibility
igorbernstein2 May 27, 2026
0854204
chore: migrate pool scheduling to a hashed-wheel timer
igorbernstein2 Jun 12, 2026
54c287e
chore: introduce VOperation as the head of the middleware chain
igorbernstein2 Jun 12, 2026
c88862a
chore: introduce OpExecutor; plumb VRpcCallContext.getExecutor
igorbernstein2 Jun 12, 2026
8f99778
chore: move callback dispatch from RetryingVRpc to VRpcImpl
igorbernstein2 Jun 13, 2026
14dde0b
chore: add session SynchronizationContext alongside the existing lock
igorbernstein2 Jun 14, 2026
dd0ab58
chore: make startRpc and cancelRpc async via sessionSyncContext
igorbernstein2 Jun 14, 2026
f5a0130
chore: remove synchronized(lock) from SessionImpl
igorbernstein2 May 28, 2026
e03690f
chore: abort session on uncaught exception in sessionSyncContext
igorbernstein2 Jun 4, 2026
9174017
chore: isolate user-callback executor on a cached thread pool
igorbernstein2 Jun 14, 2026
9f741b0
chore: back OpExecutor with SerializingExecutor(userCallbackExecutor)
igorbernstein2 Jun 14, 2026
ac7b890
chore: configure gRPC session streams with DirectExecutor
igorbernstein2 May 28, 2026
ea69618
chore: route PendingVRpc per-op state through the op executor
igorbernstein2 May 28, 2026
a17cc1e
chore: consolidate cancel trampolines at VOperationImpl
igorbernstein2 Jun 14, 2026
fc26c64
chore: replace OpExecutor backing with an inline-capable queue; tight…
igorbernstein2 Jun 14, 2026
520a2be
test: add 30-second @Timeout to all session/pool integration tests
igorbernstein2 May 27, 2026
2f2a111
fix: arm PendingVRpc deadline monitor only after committing to queue
igorbernstein2 Jun 15, 2026
bc7b4c3
fix: synchronize Client.sessionPools access
igorbernstein2 Jun 15, 2026
bfa99cd
fix: synthesize fallback closeReason in notifyTerminalClose
igorbernstein2 Jun 15, 2026
305bd1e
fix: drain SessionPools before tearing down userCallbackExecutor on C…
igorbernstein2 Jun 15, 2026
e1e6f6c
fix: serialize VOperationImpl addListener through op executor
igorbernstein2 Jun 15, 2026
a52b63c
fix: gate tracer.onAttemptFinish on the Active stale-state check
igorbernstein2 Jun 15, 2026
d6f8244
fix: ShimImpl uses shutdownAndAwait for userCallbackExecutor
igorbernstein2 Jun 15, 2026
5e619a4
chore: replace fully-qualified type names with imports
igorbernstein2 Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
import com.google.cloud.bigtable.data.v2.internal.csm.Metrics;
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
import com.google.cloud.bigtable.data.v2.internal.session.SessionPool;
import com.google.cloud.bigtable.data.v2.internal.session.BigtableTimer;
import com.google.cloud.bigtable.data.v2.internal.session.VRpcDescriptor;
import com.google.cloud.bigtable.data.v2.internal.util.ClientConfigurationManager;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executor;

public class AuthorizedViewAsync implements AutoCloseable, Closeable {

Expand All @@ -48,7 +49,8 @@ static AuthorizedViewAsync createAndStart(
String viewId,
Permission permission,
Metrics metrics,
ScheduledExecutorService executorService) {
BigtableTimer timer,
Executor userCallbackExecutor) {

AuthorizedViewName viewName =
AuthorizedViewName.builder()
Expand Down Expand Up @@ -78,7 +80,8 @@ static AuthorizedViewAsync createAndStart(
callOptions,
viewName.toString(),
metrics,
executorService);
timer,
userCallbackExecutor);

return new AuthorizedViewAsync(base);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,38 @@
import com.google.cloud.bigtable.data.v2.internal.csm.MetricsImpl;
import com.google.cloud.bigtable.data.v2.internal.csm.NoopMetrics;
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
import com.google.cloud.bigtable.data.v2.internal.session.BigtableTimer;
import com.google.cloud.bigtable.data.v2.internal.session.NettyWheelTimer;
import com.google.cloud.bigtable.data.v2.internal.session.SessionPool;
import com.google.cloud.bigtable.data.v2.internal.util.ClientConfigurationManager;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.CallOptions;
import io.opencensus.stats.Stats;
import io.opencensus.tags.Tags;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Client implements AutoCloseable {
private static final Logger logger = Logger.getLogger(Client.class.getName());

// Per-pool drain budget during close. One full watchdog tick (5 min) plus 1 min buffer; if a
// pool can't drain in that window, something is genuinely wrong on the server side and we give
// up on it so close() returns. The watchdog interval is what makes the worst case finite.
private static final Duration POOL_DRAIN_TIMEOUT = Duration.ofMinutes(6);

public static final FeatureFlags BASE_FEATURE_FLAGS =
FeatureFlags.newBuilder()
.setReverseScans(false)
Expand All @@ -71,13 +88,26 @@ public class Client implements AutoCloseable {
private final FeatureFlags featureFlags;
private final ClientInfo clientInfo;
private final Resource<ScheduledExecutorService> backgroundExecutor;
// Drains the per-op SerializingExecutor. Cached pool so a blocked user callback does not starve
// heartbeats, retry delays, or other vRPCs (which all run on backgroundExecutor).
// TODO: source from the gax TransportChannelProvider so transport and user-callback dispatch
// share the same pool. Blocked on missing APIs to extract the configured executor from gax.
private final Resource<ExecutorService> userCallbackExecutor;
// Hashed-wheel timer for heartbeat / deadline / watchdog / retry scheduling. Built over
// backgroundExecutor (the timer's tick thread dispatches bodies onto it). Single tick thread per
// Client, shared across every SessionPoolImpl.
private final BigtableTimer sessionTimer;

private final CallOptions defaultCallOptions;
private final ChannelPool channelPool;
private final Resource<Metrics> metrics;
private final Resource<ClientConfigurationManager> configManager;

private final Set<SessionPool<?>> sessionPools = Collections.newSetFromMap(new WeakHashMap<>());
// Set true at the start of close(); guards openTableAsync / openAuthorizedViewAsync /
// openMaterializedViewAsync so concurrent opens during shutdown don't create pools the close
// path won't see.
private final AtomicBoolean closed = new AtomicBoolean(false);

public static Client create(ClientSettings settings) throws IOException {
FeatureFlags featureFlags =
Expand All @@ -90,6 +120,12 @@ public static Client create(ClientSettings settings) throws IOException {
.build();

ScheduledExecutorService backgroundExecutor = Executors.newScheduledThreadPool(4);
ExecutorService userCallbackExecutor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("bigtable-callback-%d")
.setDaemon(true)
.build());

// TODO: compat layer: get this from settings
String universeDomain = "googleapis.com";
Expand Down Expand Up @@ -137,6 +173,7 @@ public static Client create(ClientSettings settings) throws IOException {
}
metrics.close();
backgroundExecutor.shutdown();
userCallbackExecutor.shutdown();
throw new RuntimeException("Failed to fetch initial config", e);
}

Expand All @@ -150,7 +187,8 @@ public static Client create(ClientSettings settings) throws IOException {
settings.getChannelProvider(),
Resource.createOwned(metrics, metrics::close),
Resource.createOwned(configManager, configManager::close),
Resource.createOwned(backgroundExecutor, backgroundExecutor::shutdown));
Resource.createOwned(backgroundExecutor, backgroundExecutor::shutdown),
Resource.createOwned(userCallbackExecutor, () -> shutdownAndAwait(userCallbackExecutor)));
}

public Client(
Expand All @@ -159,13 +197,18 @@ public Client(
ChannelProvider channelProvider,
Resource<Metrics> metrics,
Resource<ClientConfigurationManager> configManager,
Resource<ScheduledExecutorService> bgExecutor)
Resource<ScheduledExecutorService> bgExecutor,
Resource<ExecutorService> userCallbackExecutor)
throws IOException {
this.featureFlags = featureFlags;
this.clientInfo = clientInfo;
this.metrics = metrics;
this.configManager = configManager;
this.backgroundExecutor = bgExecutor;
this.userCallbackExecutor = userCallbackExecutor;
// Timer's tick thread dispatches bodies onto backgroundExecutor — tick-thread-blocking work
// (anything that takes a pool lock) gets handed off there instead of stalling the wheel.
this.sessionTimer = new NettyWheelTimer("bigtable-session-timer", bgExecutor.get());

defaultCallOptions = CallOptions.DEFAULT;

Expand All @@ -192,20 +235,66 @@ public Client(

@Override
public void close() {
sessionPools.forEach(
pool ->
pool.close(
CloseSessionRequest.newBuilder()
.setReason(CloseSessionReason.CLOSE_SESSION_REASON_USER)
.setDescription("Client closing")
.build()));
if (!closed.compareAndSet(false, true)) {
return; // idempotent
}

List<SessionPool<?>> toClose;
synchronized (sessionPools) {
toClose = new ArrayList<>(sessionPools);
}

CloseSessionRequest closeReq =
CloseSessionRequest.newBuilder()
.setReason(CloseSessionReason.CLOSE_SESSION_REASON_USER)
.setDescription("Client closing")
.build();

// Phase 1: initiate graceful close on each pool. Returns immediately; sessions transition
// CLOSING → graceful CloseSessionRequest → WAIT_SERVER_CLOSE → CLOSED asynchronously.
toClose.forEach(p -> p.close(closeReq));

// Phase 2: wait for sessions to drain. The pool's watchdog stays alive during this wait and
// escalates anything stuck in WAIT_SERVER_CLOSE longer than its tick interval (5 min). Once
// a pool's last session reaches CLOSED, drainedFuture completes and awaitTerminated returns.
// Sequential: worst case is POOL_DRAIN_TIMEOUT * N pools, but the happy path drains in << 1s.
for (SessionPool<?> pool : toClose) {
try {
if (!pool.awaitTerminated(POOL_DRAIN_TIMEOUT)) {
logger.warning(
"SessionPool did not drain within "
+ POOL_DRAIN_TIMEOUT
+ "; abandoning and continuing shutdown");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.WARNING, "Interrupted while draining SessionPool", e);
break;
}
}

// Phase 3: tear down infrastructure. By this point all listener.onClose tasks for in-flight
// RPCs are queued on their op executors (which run on userCallbackExecutor), and no new
// session responses are coming since every session is CLOSED. The 5s await inside
// userCallbackExecutor.close() is therefore just a guard for tasks in flight — it should
// return immediately in the typical case.
userCallbackExecutor.close();
metrics.close();
channelPool.close();
configManager.close();
// Stop the timer before tearing down backgroundExecutor (the timer's dispatcher).
sessionTimer.stop();
backgroundExecutor.close();
}

private void checkNotClosed() {
if (closed.get()) {
throw new IllegalStateException("Client is closed");
}
}

public TableAsync openTableAsync(String tableId, Permission permission) {
checkNotClosed();
TableAsync tableAsync =
TableAsync.createAndStart(
featureFlags,
Expand All @@ -216,13 +305,17 @@ public TableAsync openTableAsync(String tableId, Permission permission) {
tableId,
permission,
metrics.get(),
backgroundExecutor.get());
sessionPools.add(tableAsync.getSessionPool());
sessionTimer,
userCallbackExecutor.get());
synchronized (sessionPools) {
sessionPools.add(tableAsync.getSessionPool());
}
return tableAsync;
}

public AuthorizedViewAsync openAuthorizedViewAsync(
String tableId, String viewId, OpenAuthorizedViewRequest.Permission permission) {
checkNotClosed();
AuthorizedViewAsync viewAsync =
AuthorizedViewAsync.createAndStart(
featureFlags,
Expand All @@ -234,13 +327,17 @@ public AuthorizedViewAsync openAuthorizedViewAsync(
viewId,
permission,
metrics.get(),
backgroundExecutor.get());
sessionPools.add(viewAsync.getSessionPool());
sessionTimer,
userCallbackExecutor.get());
synchronized (sessionPools) {
sessionPools.add(viewAsync.getSessionPool());
}
return viewAsync;
}

public MaterializedViewAsync openMaterializedViewAsync(
String viewId, OpenMaterializedViewRequest.Permission permission) {
checkNotClosed();
MaterializedViewAsync viewAsync =
MaterializedViewAsync.createAndStart(
featureFlags,
Expand All @@ -251,14 +348,19 @@ public MaterializedViewAsync openMaterializedViewAsync(
viewId,
permission,
metrics.get(),
backgroundExecutor.get());
sessionPools.add(viewAsync.getSessionPool());
sessionTimer,
userCallbackExecutor.get());
synchronized (sessionPools) {
sessionPools.add(viewAsync.getSessionPool());
}
return viewAsync;
}

public static class Resource<T> {
private T value;
private Runnable closer;
private final T value;
private final Runnable closer;
private final java.util.concurrent.atomic.AtomicBoolean closed =
new java.util.concurrent.atomic.AtomicBoolean(false);

public static <T> Resource<T> createOwned(T value, Runnable closer) {
return new Resource<>(value, closer);
Expand All @@ -273,12 +375,31 @@ private Resource(T value, Runnable closer) {
this.closer = closer;
}

/** Idempotent. Repeat calls are no-ops. */
public void close() {
this.closer.run();
if (closed.compareAndSet(false, true)) {
this.closer.run();
}
}

public T get() {
return value;
}
}

// Drain in-flight listener.onClose tasks before the executor is shut down; bound the wait at 5s
// so close() doesn't hang the caller on a pathological listener. Public so the compat
// ShimImpl (different package) can reuse the same shutdown semantics for the user-callback
// executor it owns.
public static void shutdownAndAwait(ExecutorService exec) {
exec.shutdown();
try {
if (!exec.awaitTermination(5, TimeUnit.SECONDS)) {
exec.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
exec.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import com.google.cloud.bigtable.data.v2.internal.csm.Metrics;
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
import com.google.cloud.bigtable.data.v2.internal.session.SessionPool;
import com.google.cloud.bigtable.data.v2.internal.session.BigtableTimer;
import com.google.cloud.bigtable.data.v2.internal.session.VRpcDescriptor;
import com.google.cloud.bigtable.data.v2.internal.util.ClientConfigurationManager;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executor;

public class MaterializedViewAsync implements AutoCloseable, Closeable {

Expand All @@ -44,7 +45,8 @@ public static MaterializedViewAsync createAndStart(
String viewId,
OpenMaterializedViewRequest.Permission permission,
Metrics metrics,
ScheduledExecutorService executorService) {
BigtableTimer timer,
Executor userCallbackExecutor) {

MaterializedViewName viewName =
MaterializedViewName.builder()
Expand Down Expand Up @@ -73,7 +75,8 @@ public static MaterializedViewAsync createAndStart(
callOptions,
viewId,
metrics,
executorService);
timer,
userCallbackExecutor);

return new MaterializedViewAsync(base);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import com.google.cloud.bigtable.data.v2.internal.csm.Metrics;
import com.google.cloud.bigtable.data.v2.internal.csm.attributes.ClientInfo;
import com.google.cloud.bigtable.data.v2.internal.session.SessionPool;
import com.google.cloud.bigtable.data.v2.internal.session.BigtableTimer;
import com.google.cloud.bigtable.data.v2.internal.session.VRpcDescriptor;
import com.google.cloud.bigtable.data.v2.internal.util.ClientConfigurationManager;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executor;

public class TableAsync implements AutoCloseable, Closeable {
private final TableBase base;
Expand All @@ -47,7 +48,8 @@ public static TableAsync createAndStart(
String tableId,
Permission permission,
Metrics metrics,
ScheduledExecutorService executorService) {
BigtableTimer timer,
Executor userCallbackExecutor) {

TableName tableName =
TableName.builder()
Expand Down Expand Up @@ -76,7 +78,8 @@ public static TableAsync createAndStart(
callOptions,
tableId,
metrics,
executorService);
timer,
userCallbackExecutor);

return new TableAsync(base);
}
Expand Down
Loading
Loading