Skip to content

chore: refactor concurrency model in the bgitable session protocol #13485

Draft
igorbernstein2 wants to merge 24 commits into
googleapis:mainfrom
igorbernstein2:threading-refactor-v2
Draft

chore: refactor concurrency model in the bgitable session protocol #13485
igorbernstein2 wants to merge 24 commits into
googleapis:mainfrom
igorbernstein2:threading-refactor-v2

Conversation

@igorbernstein2

Copy link
Copy Markdown
Contributor

No description provided.

Document lock ownership before later steps remove the lock. Add @GuardedBy
to fields under the lock, move heartbeatInterval read into the synchronized
block in startRpc(), and comment fields intentionally read outside the lock.
No runtime change.
Replace ScheduledExecutorService with a BigtableTimer (Netty hashed wheel,
will move in-tree later) for heartbeat, deadline, watchdog, AFE-prune,
retry-create-session, and retry-delay. Owned by Client and shared across
pools.
Replaces CancellableVRpc with a VOperation layer that sits above the VRpc
chain rather than inside it. VOperationImpl owns the gRPC Context
cancellation listener and constructs the per-op VRpcCallContext; downstream
middleware just sees the chain.
VRpcCallContext.getExecutor() returns OpExecutor (thin wrapper with
runningThread affinity tracking). VOperationImpl constructs the per-call
SynchronizationContext + OpExecutor; RetryingVRpc drops its own SyncContext
and dispatches via ctx.getExecutor(). The uncaught-handler safety net moves
from RetryingVRpc up to VOperationImpl.
VRpcImpl.handle*() methods now dispatch listener callbacks via
ctx.getExecutor(), with CAS STARTED->CLOSED in all three (handleError no
longer proceeds from NEW) and decode moved into the executor task.
RetryingVRpc.Active drops its own wrap since callbacks already arrive on
the op executor. start() publishes ctx/listener only after winning the CAS
so a racing duplicate can't corrupt the winner's fields. SessionPoolImpl's
three direct listener.onClose paths also dispatch via ctx.getExecutor().
SessionImpl gains sessionSyncContext that serializes stream callbacks.
onMessage/onClose dispatch onto it, and the per-session heartbeat tick
trampolines through it. synchronized(lock) blocks remain inside the
handlers — the two coexist for now. Affinity asserts added at boundary
methods and every handle*.
SessionImpl.startRpc and cancelRpc now submit to sessionSyncContext rather
than running synchronously on the caller. VRpcSessionApi.startRpc is void
— errors flow through rpc.handleError() onto ctx.getExecutor(). VRpcImpl
drops its synchronous post-startRpc error branch.
All session state mutations now run on sessionSyncContext, so the per-session
Object lock is no longer needed. Public methods (start, close, forceClose,
startRpc, cancelRpc) submit onto sessionSyncContext; nextRpcId becomes
AtomicLong for the cross-thread newCall() caller. handleVRpcResponse and
handleVRpcErrorResponse drop the localCancel/localRpc capture-and-recheck
dance — sessionSyncContext serializes them now. Stale lock-era comments
on the fields are replaced with a sessionSyncContext-ownership note.
SessionImplTest.testHeartbeat polls for the now-async nextHeartbeat update.
Split terminal close into notifyTerminalClose (per-target try/catch fan-out)
and abortFromUncaughtException (global handler). Uncaught syncContext
exceptions always set closeReason to ERROR — the prior reason is folded into
the description so tracer/metrics correctly attribute aborts. Adds three
regression tests (listener.onReady throws, onClose throws, both throw).
Client (and ShimImpl) own a dedicated bigtable-callback-%d cached pool,
plumbed through *Async / TableBase. A blocked user callback can no longer
starve heartbeats, retry delays, or pool bookkeeping (all of which run on
backgroundExecutor). The op-level SerializingExecutor in a later commit
will dispatch onto this pool.
VOperationImpl now constructs OpExecutor over a per-call SequentialExecutor
on the shared userCallbackExecutor, replacing the per-call
SynchronizationContext. OpExecutor gains an UncaughtExceptionHandler ctor
arg — the safety net that the removed RetryingVRpc-owned SyncContext
provided. The 3-arg VRpcCallContext.create defaults to a no-op handler for
tests; production callers go through VOperationImpl.
CallOptions.withExecutor(MoreExecutors.directExecutor()) on the session
stream so Netty I/O threads deliver SessionStream.Listener callbacks
directly; sessionSyncContext immediately trampolines off them.
PendingVRpc.cancel and drainTo move isCancelled/realCall onto
ctx.getExecutor(); the pool lock now covers only queue / poolState / session
list. close() switches to cancelWithResult to honor the new op-executor
contract. NOOP_CALL sentinel removed.
VOperationImpl captures opExecutor in start() and trampolines start/cancel
via it. RetryingVRpc.start runs synchronously on the op-executor task
RetryingVRpc.cancel no longer wraps in execute. Tracer.onOperationStart
reordered before started=true (a throwing tracer short-circuits to direct
listener.onClose). listener.onMessage failures classify as USER_FAILURE.
CleanupListener tracks a closed flag to prevent gRPC-context listener
leaks on synchronous chain close.
…en Client.close

OpExecutor switches from a SequentialExecutor backing to an internal
ArrayDeque + drain loop, and gains runInline() — runs the task synchronously
on the caller thread when the executor is idle, otherwise queues it.
VOperationImpl uses runInline for chain.start so the start dispatch skips
the queue+drain round-trip. Drain rejections (e.g. during shutdown) reset
drainScheduled so subsequent submissions can retry.

Client.close drains userCallbackExecutor first via a 5s-bounded
shutdownAndAwait so pool.close's cancelWithResult onClose notifications
complete before the executor is torn down. Resource.close becomes idempotent.
Tests that call Future.get() on vRpc chains can hang indefinitely if an
exception breaks the callback dispatch chain and orphans the future (see
ISSUE-006). A class-level JUnit 5 @timeout(30) converts a silent hang into
a clear test failure within a bounded time.

Applied to: RetryingVRpcTest, VRpcTracerTest, ClientTest, TableBaseTest,
SessionImplTest, SessionPoolImplTest.
PendingVRpc.start scheduled the deadline timer before the synchronized
pool-state check, so the pool-closed early-return path returned without
cancelling it. The timer fired later and called listener.onClose a second
time with DEADLINE_EXCEEDED. RetryingVRpc.Active suppressed the duplicate
at the user-facing layer via its currentState guard, but tracer.onAttemptFinish
still ran a second time for the already-finished attempt — corrupting per-
attempt metrics.

Move the monitorDeadline call inside the synchronized block, after the
pool-state check, so closed pools take the fast-fail path without scheduling.

Adds SessionPoolImplTest#pendingVRpcOnClosedPoolDoesNotLeakDeadlineMonitor;
verified the test fails against the pre-fix code.
sessionPools is Collections.newSetFromMap(new WeakHashMap<>()) — neither
thread-safe nor synchronized. close() iterated it via forEach while
openTableAsync / openAuthorizedViewAsync / openMaterializedViewAsync added
to it concurrently, risking ConcurrentModificationException or a silently
skipped just-added pool that close() believed it had closed.

Wrap all four sites in synchronized(sessionPools) blocks. close() snapshots
into an ArrayList under the monitor and iterates the copy outside, so the
slow pool.close() calls don't block concurrent opens.
notifyTerminalClose dereferenced closeReason without a null guard. Today
every caller sets it first (forceClose, startGracefulClose's null-synth,
dispatchStreamClosed's else branch, abortFromUncaughtException), so the
invariant holds — but it is fragile: a future writer of WAIT_SERVER_CLOSE
that forgets to set closeReason would NPE here.

A precondition checkState would be wrong: the function's whole purpose is
to fan out terminal notifications behind per-notification try/catch so one
failure does not suppress the rest. A throw before any notification runs
escapes to the sessionSyncContext uncaught handler, which finds the state
already CLOSED and early-returns — all cleanup silently skipped.

Mirror the synthesizer pattern from startGracefulClose: log a warning with
an IllegalStateException for stack-trace observability, then build a
fallback CloseSessionRequest so the rest of the fan-out runs unchanged.
…lient.close

Client.close shut down userCallbackExecutor before draining the SessionPools
that depend on it, so late listener.onClose tasks from in-flight RPCs
arrived after backing was dead and got RejectedExecutionException — silently
stranding the user's terminal callbacks. The earlier fix sprinkled
inline-drain fallbacks inside OpExecutor; restructure shutdown instead so
the race can't happen.

SessionPool gains awaitTerminated(Duration), backed by a CompletableFuture
SessionPoolImpl completes from onSessionClose once the pool is CLOSED and
the last session has drained. close() no longer kills the watchdog —
awaitTerminated takes ownership of that, so the watchdog stays alive during
shutdown and can escalate any session stuck in WAIT_SERVER_CLOSE longer
than its tick interval (5 min) via forceClose.

Client.close becomes three explicit phases: (1) initiate graceful close on
each pool, (2) awaitTerminated on each with a 6-minute per-pool budget
(one full watchdog tick plus buffer), (3) tear down userCallbackExecutor /
channelPool / timers in the existing order, now safely because all
listener.onClose tasks are queued or drained before backing dies.

Add a `closed` AtomicBoolean + checkNotClosed() guard on the three openers
so concurrent opens during shutdown can't create pools the close path
won't see. close() is now idempotent via CAS on that flag.

Tests:
  - ClientTest#openAfterCloseThrows: openTable/View/MaterializedView all
    throw IllegalStateException after close
  - ClientTest#closeIsIdempotent: double close is a no-op
  - SessionPoolImplTest#awaitTerminatedReturnsTrueWhenPoolIsEmpty
  - SessionPoolImplTest#awaitTerminatedReturnsTrueAfterSessionsDrain
  - SessionPoolImplTest tearDown now calls awaitTerminated so the watchdog
    is closed before testTimer.stop races its self-reschedule
  - FakeSessionPool in TableBaseTest gains a no-op awaitTerminated stub

Drive-by: remove the spurious @nested annotation from SessionPoolImplTest's
top-level class. @nested is meaningful only on non-static inner classes;
on the outer class it caused Surefire to mis-attribute test counts (outer
tests reported under $RetrySessionCreation). Tests were executed correctly
either way, just the reporting was wrong.

Full module suite (2375 tests) passes in 3:15, unchanged from before.
VOperationImpl.start read wrapped.closed on the caller thread and then
called grpcContext.addListener if false. If chain.start had asynchronously
queued an onClose (PendingVRpc pool-closed fast-fail, VRpcImpl
deadline-exceeded short-circuit), the backing-thread drain could land
between the read and the addListener call — CleanupListener.onClose would
set closed=true and call removeListener as a no-op pre-registration, and
the caller would then register a listener with nothing to remove it.

The leak is per-RPC, permanent until grpcContext cancels. For long-lived
application contexts that pin chain → opExecutor → userCallbackExecutor,
this accumulates indefinitely.

Queue the addListener through exec.runInline so FIFO ordering with the
op executor guarantees soundness: any onClose chain.start enqueued drains
first, so wrapped.closed is accurate by the time we evaluate it. If
grpcContext is cancelled between start() returning and the queued task
running, addListener+directExecutor fires the cancellationListener
immediately at registration time, so cancel still propagates correctly.

runInline (not execute) is the right verb here: when chain.start enqueued
nothing (common path) the executor is idle and the registration body runs
inline on the caller thread — no extra context switch. When chain.start
did enqueue, runInline takes the queue branch and FIFO drains both.

Adds VOperationImplTest with two tests:
  - grpcContextCancelPropagatesToChain: normal-path sanity that
    addListener still wires cancellation through to chain.cancel
  - asyncOnCloseFromChainDoesNotPropagateLaterContextCancel: regression —
    with an async-queued onClose, the cancellationListener must not be
    registered, so a later grpcContext.cancel does not reach chain.cancel
Active.onClose called tracer.onAttemptFinish before the currentState !=
Active.this guard, so a discarded onClose still counted as an attempt
finish. Today nothing reaches this path with a stale Active (VRpcImpl's
CAS blocks late deliveries upstream), but the ordering is a latent
hazard — any future code change that lets a late onClose through would
double-fire tracer.onAttemptFinish for the same attempt and skew per-
attempt metrics.

Swap the two lines so the guard runs first, matching onMessage above.
No behavior change today since the guard doesn't fire in current code
paths; this is structural defense.
ShimImpl registered its userCallbackExecutor with a bare shutdown() closer,
while Client.create's path uses shutdownAndAwait (which gives in-flight
listener.onClose tasks a 5-second drain window before shutdownNow). On
ShimImpl close, queued callbacks were abandoned mid-flight — fine for
quiescent shutdowns but a regression for fast-close patterns (test boundaries,
dynamic config reloads) where in-flight callbacks have not yet drained.

Promote Client.shutdownAndAwait from private-static to public-static so
ShimImpl (different package) can reuse the same shutdown semantics, and
update ShimImpl to call it.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the threading and execution model of the Bigtable client by introducing a per-operation serializing executor (OpExecutor), a unified operation wrapper (VOperationImpl), and a hashed-wheel timer (BigtableTimer/NettyWheelTimer) to handle scheduling tasks like heartbeats and deadlines without stalling Netty I/O threads. Additionally, SessionImpl and SessionPoolImpl are refactored to use SynchronizationContext instead of synchronized blocks. The review feedback highlights critical thread-safety and robustness issues, including potential visibility bugs from removing volatile on exposed session parameters, a potential NullPointerException in PendingVRpc.cancel when ctx is null, and the need to guard PendingVRpc.start against already-cancelled operations.

Comment on lines +109 to +111
private OpenParams openParams;

private volatile boolean openParamsUpdated;
private boolean openParamsUpdated;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Removing the volatile keyword from openParams and openParamsUpdated introduces a Java Memory Model (JMM) visibility and data-race issue. Even if there are no off-context production readers today, these fields are exposed via public getters (such as getOpenParams() and isOpenParamsUpdated()). Any external thread (like SessionPoolImpl or background threads) calling these getters directly without executing on the sessionSyncContext could read stale values or fail to see updates entirely. Keeping them volatile is crucial for thread safety and defensive programming.

Suggested change
private OpenParams openParams;
private volatile boolean openParamsUpdated;
private boolean openParamsUpdated;
private volatile OpenParams openParams;
private volatile boolean openParamsUpdated;

Comment on lines 739 to +743
private void cancel(Status status, boolean onlyCancelPendingCall) {
boolean delegateToRealCall = true;
synchronized (SessionPoolImpl.this) {
if (isCancelled) {
return;
}
pendingRpcs.remove(this); // eager removal; no-op if already drained
}
ctx.getExecutor().execute(() -> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If cancel is called on a PendingVRpc before start() has been invoked, ctx will be null, leading to a NullPointerException when calling ctx.getExecutor(). To prevent this, we should check if ctx is null under the SessionPoolImpl.this monitor lock, set isCancelled = true, and return early. Note: While we use synchronized here to match the existing locking strategy of SessionPoolImpl, in performance-sensitive code, explicit locks are preferred over the synchronized keyword to protect shared state.

    private void cancel(Status status, boolean onlyCancelPendingCall) {
      synchronized (SessionPoolImpl.this) {
        pendingRpcs.remove(this); // eager removal; no-op if already drained
        if (ctx == null) {
          isCancelled = true;
          return;
        }
      }
      ctx.getExecutor().execute(() -> {
References
  1. In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.

Comment on lines 693 to 701
synchronized (SessionPoolImpl.this) {
if (SessionPoolImpl.this.poolState != PoolState.STARTED) {
listener.onClose(
VRpcResult result =
VRpcResult.createUncommitedError(
Status.UNAVAILABLE.withCause(
new IllegalStateException("SessionPool is closed"))));
new IllegalStateException("SessionPool is closed")));
ctx.getExecutor().execute(() -> listener.onClose(result));
return;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To complement the pre-start cancellation guard, PendingVRpc.start should check if isCancelled is already true when called, and if so, fast-fail immediately instead of queueing the RPC in pendingRpcs. Note: While we use synchronized here to match the existing locking strategy of SessionPoolImpl, in performance-sensitive code, explicit locks are preferred over the synchronized keyword to protect shared state.

Suggested change
synchronized (SessionPoolImpl.this) {
if (SessionPoolImpl.this.poolState != PoolState.STARTED) {
listener.onClose(
VRpcResult result =
VRpcResult.createUncommitedError(
Status.UNAVAILABLE.withCause(
new IllegalStateException("SessionPool is closed"))));
new IllegalStateException("SessionPool is closed")));
ctx.getExecutor().execute(() -> listener.onClose(result));
return;
}
synchronized (SessionPoolImpl.this) {
if (SessionPoolImpl.this.poolState != PoolState.STARTED) {
VRpcResult result =
VRpcResult.createUncommitedError(
Status.UNAVAILABLE.withCause(
new IllegalStateException("SessionPool is closed")));
ctx.getExecutor().execute(() -> listener.onClose(result));
return;
}
if (isCancelled) {
VRpcResult result = VRpcResult.createRejectedError(Status.CANCELLED);
ctx.getExecutor().execute(() -> listener.onClose(result));
return;
}
References
  1. In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.

Also swap Guava Objects.hashCode(id) for Long.hashCode(id) in
ChannelPoolDpImpl.AfeId to avoid per-call boxing and array allocation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant