chore: refactor concurrency model in the bgitable session protocol #13485
chore: refactor concurrency model in the bgitable session protocol #13485igorbernstein2 wants to merge 24 commits into
Conversation
Document lock ownership before later steps remove the lock. Add @GuardedBy to fields under the lock, move heartbeatInterval read into the synchronized block in startRpc(), and comment fields intentionally read outside the lock. No runtime change.
Replace ScheduledExecutorService with a BigtableTimer (Netty hashed wheel, will move in-tree later) for heartbeat, deadline, watchdog, AFE-prune, retry-create-session, and retry-delay. Owned by Client and shared across pools.
Replaces CancellableVRpc with a VOperation layer that sits above the VRpc chain rather than inside it. VOperationImpl owns the gRPC Context cancellation listener and constructs the per-op VRpcCallContext; downstream middleware just sees the chain.
VRpcCallContext.getExecutor() returns OpExecutor (thin wrapper with runningThread affinity tracking). VOperationImpl constructs the per-call SynchronizationContext + OpExecutor; RetryingVRpc drops its own SyncContext and dispatches via ctx.getExecutor(). The uncaught-handler safety net moves from RetryingVRpc up to VOperationImpl.
VRpcImpl.handle*() methods now dispatch listener callbacks via ctx.getExecutor(), with CAS STARTED->CLOSED in all three (handleError no longer proceeds from NEW) and decode moved into the executor task. RetryingVRpc.Active drops its own wrap since callbacks already arrive on the op executor. start() publishes ctx/listener only after winning the CAS so a racing duplicate can't corrupt the winner's fields. SessionPoolImpl's three direct listener.onClose paths also dispatch via ctx.getExecutor().
SessionImpl gains sessionSyncContext that serializes stream callbacks. onMessage/onClose dispatch onto it, and the per-session heartbeat tick trampolines through it. synchronized(lock) blocks remain inside the handlers — the two coexist for now. Affinity asserts added at boundary methods and every handle*.
SessionImpl.startRpc and cancelRpc now submit to sessionSyncContext rather than running synchronously on the caller. VRpcSessionApi.startRpc is void — errors flow through rpc.handleError() onto ctx.getExecutor(). VRpcImpl drops its synchronous post-startRpc error branch.
All session state mutations now run on sessionSyncContext, so the per-session Object lock is no longer needed. Public methods (start, close, forceClose, startRpc, cancelRpc) submit onto sessionSyncContext; nextRpcId becomes AtomicLong for the cross-thread newCall() caller. handleVRpcResponse and handleVRpcErrorResponse drop the localCancel/localRpc capture-and-recheck dance — sessionSyncContext serializes them now. Stale lock-era comments on the fields are replaced with a sessionSyncContext-ownership note. SessionImplTest.testHeartbeat polls for the now-async nextHeartbeat update.
Split terminal close into notifyTerminalClose (per-target try/catch fan-out) and abortFromUncaughtException (global handler). Uncaught syncContext exceptions always set closeReason to ERROR — the prior reason is folded into the description so tracer/metrics correctly attribute aborts. Adds three regression tests (listener.onReady throws, onClose throws, both throw).
Client (and ShimImpl) own a dedicated bigtable-callback-%d cached pool, plumbed through *Async / TableBase. A blocked user callback can no longer starve heartbeats, retry delays, or pool bookkeeping (all of which run on backgroundExecutor). The op-level SerializingExecutor in a later commit will dispatch onto this pool.
VOperationImpl now constructs OpExecutor over a per-call SequentialExecutor on the shared userCallbackExecutor, replacing the per-call SynchronizationContext. OpExecutor gains an UncaughtExceptionHandler ctor arg — the safety net that the removed RetryingVRpc-owned SyncContext provided. The 3-arg VRpcCallContext.create defaults to a no-op handler for tests; production callers go through VOperationImpl.
CallOptions.withExecutor(MoreExecutors.directExecutor()) on the session stream so Netty I/O threads deliver SessionStream.Listener callbacks directly; sessionSyncContext immediately trampolines off them.
PendingVRpc.cancel and drainTo move isCancelled/realCall onto ctx.getExecutor(); the pool lock now covers only queue / poolState / session list. close() switches to cancelWithResult to honor the new op-executor contract. NOOP_CALL sentinel removed.
VOperationImpl captures opExecutor in start() and trampolines start/cancel via it. RetryingVRpc.start runs synchronously on the op-executor task RetryingVRpc.cancel no longer wraps in execute. Tracer.onOperationStart reordered before started=true (a throwing tracer short-circuits to direct listener.onClose). listener.onMessage failures classify as USER_FAILURE. CleanupListener tracks a closed flag to prevent gRPC-context listener leaks on synchronous chain close.
…en Client.close OpExecutor switches from a SequentialExecutor backing to an internal ArrayDeque + drain loop, and gains runInline() — runs the task synchronously on the caller thread when the executor is idle, otherwise queues it. VOperationImpl uses runInline for chain.start so the start dispatch skips the queue+drain round-trip. Drain rejections (e.g. during shutdown) reset drainScheduled so subsequent submissions can retry. Client.close drains userCallbackExecutor first via a 5s-bounded shutdownAndAwait so pool.close's cancelWithResult onClose notifications complete before the executor is torn down. Resource.close becomes idempotent.
Tests that call Future.get() on vRpc chains can hang indefinitely if an exception breaks the callback dispatch chain and orphans the future (see ISSUE-006). A class-level JUnit 5 @timeout(30) converts a silent hang into a clear test failure within a bounded time. Applied to: RetryingVRpcTest, VRpcTracerTest, ClientTest, TableBaseTest, SessionImplTest, SessionPoolImplTest.
PendingVRpc.start scheduled the deadline timer before the synchronized pool-state check, so the pool-closed early-return path returned without cancelling it. The timer fired later and called listener.onClose a second time with DEADLINE_EXCEEDED. RetryingVRpc.Active suppressed the duplicate at the user-facing layer via its currentState guard, but tracer.onAttemptFinish still ran a second time for the already-finished attempt — corrupting per- attempt metrics. Move the monitorDeadline call inside the synchronized block, after the pool-state check, so closed pools take the fast-fail path without scheduling. Adds SessionPoolImplTest#pendingVRpcOnClosedPoolDoesNotLeakDeadlineMonitor; verified the test fails against the pre-fix code.
sessionPools is Collections.newSetFromMap(new WeakHashMap<>()) — neither thread-safe nor synchronized. close() iterated it via forEach while openTableAsync / openAuthorizedViewAsync / openMaterializedViewAsync added to it concurrently, risking ConcurrentModificationException or a silently skipped just-added pool that close() believed it had closed. Wrap all four sites in synchronized(sessionPools) blocks. close() snapshots into an ArrayList under the monitor and iterates the copy outside, so the slow pool.close() calls don't block concurrent opens.
notifyTerminalClose dereferenced closeReason without a null guard. Today every caller sets it first (forceClose, startGracefulClose's null-synth, dispatchStreamClosed's else branch, abortFromUncaughtException), so the invariant holds — but it is fragile: a future writer of WAIT_SERVER_CLOSE that forgets to set closeReason would NPE here. A precondition checkState would be wrong: the function's whole purpose is to fan out terminal notifications behind per-notification try/catch so one failure does not suppress the rest. A throw before any notification runs escapes to the sessionSyncContext uncaught handler, which finds the state already CLOSED and early-returns — all cleanup silently skipped. Mirror the synthesizer pattern from startGracefulClose: log a warning with an IllegalStateException for stack-trace observability, then build a fallback CloseSessionRequest so the rest of the fan-out runs unchanged.
…lient.close
Client.close shut down userCallbackExecutor before draining the SessionPools
that depend on it, so late listener.onClose tasks from in-flight RPCs
arrived after backing was dead and got RejectedExecutionException — silently
stranding the user's terminal callbacks. The earlier fix sprinkled
inline-drain fallbacks inside OpExecutor; restructure shutdown instead so
the race can't happen.
SessionPool gains awaitTerminated(Duration), backed by a CompletableFuture
SessionPoolImpl completes from onSessionClose once the pool is CLOSED and
the last session has drained. close() no longer kills the watchdog —
awaitTerminated takes ownership of that, so the watchdog stays alive during
shutdown and can escalate any session stuck in WAIT_SERVER_CLOSE longer
than its tick interval (5 min) via forceClose.
Client.close becomes three explicit phases: (1) initiate graceful close on
each pool, (2) awaitTerminated on each with a 6-minute per-pool budget
(one full watchdog tick plus buffer), (3) tear down userCallbackExecutor /
channelPool / timers in the existing order, now safely because all
listener.onClose tasks are queued or drained before backing dies.
Add a `closed` AtomicBoolean + checkNotClosed() guard on the three openers
so concurrent opens during shutdown can't create pools the close path
won't see. close() is now idempotent via CAS on that flag.
Tests:
- ClientTest#openAfterCloseThrows: openTable/View/MaterializedView all
throw IllegalStateException after close
- ClientTest#closeIsIdempotent: double close is a no-op
- SessionPoolImplTest#awaitTerminatedReturnsTrueWhenPoolIsEmpty
- SessionPoolImplTest#awaitTerminatedReturnsTrueAfterSessionsDrain
- SessionPoolImplTest tearDown now calls awaitTerminated so the watchdog
is closed before testTimer.stop races its self-reschedule
- FakeSessionPool in TableBaseTest gains a no-op awaitTerminated stub
Drive-by: remove the spurious @nested annotation from SessionPoolImplTest's
top-level class. @nested is meaningful only on non-static inner classes;
on the outer class it caused Surefire to mis-attribute test counts (outer
tests reported under $RetrySessionCreation). Tests were executed correctly
either way, just the reporting was wrong.
Full module suite (2375 tests) passes in 3:15, unchanged from before.
VOperationImpl.start read wrapped.closed on the caller thread and then
called grpcContext.addListener if false. If chain.start had asynchronously
queued an onClose (PendingVRpc pool-closed fast-fail, VRpcImpl
deadline-exceeded short-circuit), the backing-thread drain could land
between the read and the addListener call — CleanupListener.onClose would
set closed=true and call removeListener as a no-op pre-registration, and
the caller would then register a listener with nothing to remove it.
The leak is per-RPC, permanent until grpcContext cancels. For long-lived
application contexts that pin chain → opExecutor → userCallbackExecutor,
this accumulates indefinitely.
Queue the addListener through exec.runInline so FIFO ordering with the
op executor guarantees soundness: any onClose chain.start enqueued drains
first, so wrapped.closed is accurate by the time we evaluate it. If
grpcContext is cancelled between start() returning and the queued task
running, addListener+directExecutor fires the cancellationListener
immediately at registration time, so cancel still propagates correctly.
runInline (not execute) is the right verb here: when chain.start enqueued
nothing (common path) the executor is idle and the registration body runs
inline on the caller thread — no extra context switch. When chain.start
did enqueue, runInline takes the queue branch and FIFO drains both.
Adds VOperationImplTest with two tests:
- grpcContextCancelPropagatesToChain: normal-path sanity that
addListener still wires cancellation through to chain.cancel
- asyncOnCloseFromChainDoesNotPropagateLaterContextCancel: regression —
with an async-queued onClose, the cancellationListener must not be
registered, so a later grpcContext.cancel does not reach chain.cancel
Active.onClose called tracer.onAttemptFinish before the currentState != Active.this guard, so a discarded onClose still counted as an attempt finish. Today nothing reaches this path with a stale Active (VRpcImpl's CAS blocks late deliveries upstream), but the ordering is a latent hazard — any future code change that lets a late onClose through would double-fire tracer.onAttemptFinish for the same attempt and skew per- attempt metrics. Swap the two lines so the guard runs first, matching onMessage above. No behavior change today since the guard doesn't fire in current code paths; this is structural defense.
ShimImpl registered its userCallbackExecutor with a bare shutdown() closer, while Client.create's path uses shutdownAndAwait (which gives in-flight listener.onClose tasks a 5-second drain window before shutdownNow). On ShimImpl close, queued callbacks were abandoned mid-flight — fine for quiescent shutdowns but a regression for fast-close patterns (test boundaries, dynamic config reloads) where in-flight callbacks have not yet drained. Promote Client.shutdownAndAwait from private-static to public-static so ShimImpl (different package) can reuse the same shutdown semantics, and update ShimImpl to call it.
There was a problem hiding this comment.
Code Review
This pull request refactors the threading and execution model of the Bigtable client by introducing a per-operation serializing executor (OpExecutor), a unified operation wrapper (VOperationImpl), and a hashed-wheel timer (BigtableTimer/NettyWheelTimer) to handle scheduling tasks like heartbeats and deadlines without stalling Netty I/O threads. Additionally, SessionImpl and SessionPoolImpl are refactored to use SynchronizationContext instead of synchronized blocks. The review feedback highlights critical thread-safety and robustness issues, including potential visibility bugs from removing volatile on exposed session parameters, a potential NullPointerException in PendingVRpc.cancel when ctx is null, and the need to guard PendingVRpc.start against already-cancelled operations.
| private OpenParams openParams; | ||
|
|
||
| private volatile boolean openParamsUpdated; | ||
| private boolean openParamsUpdated; |
There was a problem hiding this comment.
Removing the volatile keyword from openParams and openParamsUpdated introduces a Java Memory Model (JMM) visibility and data-race issue. Even if there are no off-context production readers today, these fields are exposed via public getters (such as getOpenParams() and isOpenParamsUpdated()). Any external thread (like SessionPoolImpl or background threads) calling these getters directly without executing on the sessionSyncContext could read stale values or fail to see updates entirely. Keeping them volatile is crucial for thread safety and defensive programming.
| private OpenParams openParams; | |
| private volatile boolean openParamsUpdated; | |
| private boolean openParamsUpdated; | |
| private volatile OpenParams openParams; | |
| private volatile boolean openParamsUpdated; |
| private void cancel(Status status, boolean onlyCancelPendingCall) { | ||
| boolean delegateToRealCall = true; | ||
| synchronized (SessionPoolImpl.this) { | ||
| if (isCancelled) { | ||
| return; | ||
| } | ||
| pendingRpcs.remove(this); // eager removal; no-op if already drained | ||
| } | ||
| ctx.getExecutor().execute(() -> { |
There was a problem hiding this comment.
If cancel is called on a PendingVRpc before start() has been invoked, ctx will be null, leading to a NullPointerException when calling ctx.getExecutor(). To prevent this, we should check if ctx is null under the SessionPoolImpl.this monitor lock, set isCancelled = true, and return early. Note: While we use synchronized here to match the existing locking strategy of SessionPoolImpl, in performance-sensitive code, explicit locks are preferred over the synchronized keyword to protect shared state.
private void cancel(Status status, boolean onlyCancelPendingCall) {
synchronized (SessionPoolImpl.this) {
pendingRpcs.remove(this); // eager removal; no-op if already drained
if (ctx == null) {
isCancelled = true;
return;
}
}
ctx.getExecutor().execute(() -> {References
- In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.
| synchronized (SessionPoolImpl.this) { | ||
| if (SessionPoolImpl.this.poolState != PoolState.STARTED) { | ||
| listener.onClose( | ||
| VRpcResult result = | ||
| VRpcResult.createUncommitedError( | ||
| Status.UNAVAILABLE.withCause( | ||
| new IllegalStateException("SessionPool is closed")))); | ||
| new IllegalStateException("SessionPool is closed"))); | ||
| ctx.getExecutor().execute(() -> listener.onClose(result)); | ||
| return; | ||
| } |
There was a problem hiding this comment.
To complement the pre-start cancellation guard, PendingVRpc.start should check if isCancelled is already true when called, and if so, fast-fail immediately instead of queueing the RPC in pendingRpcs. Note: While we use synchronized here to match the existing locking strategy of SessionPoolImpl, in performance-sensitive code, explicit locks are preferred over the synchronized keyword to protect shared state.
| synchronized (SessionPoolImpl.this) { | |
| if (SessionPoolImpl.this.poolState != PoolState.STARTED) { | |
| listener.onClose( | |
| VRpcResult result = | |
| VRpcResult.createUncommitedError( | |
| Status.UNAVAILABLE.withCause( | |
| new IllegalStateException("SessionPool is closed")))); | |
| new IllegalStateException("SessionPool is closed"))); | |
| ctx.getExecutor().execute(() -> listener.onClose(result)); | |
| return; | |
| } | |
| synchronized (SessionPoolImpl.this) { | |
| if (SessionPoolImpl.this.poolState != PoolState.STARTED) { | |
| VRpcResult result = | |
| VRpcResult.createUncommitedError( | |
| Status.UNAVAILABLE.withCause( | |
| new IllegalStateException("SessionPool is closed"))); | |
| ctx.getExecutor().execute(() -> listener.onClose(result)); | |
| return; | |
| } | |
| if (isCancelled) { | |
| VRpcResult result = VRpcResult.createRejectedError(Status.CANCELLED); | |
| ctx.getExecutor().execute(() -> listener.onClose(result)); | |
| return; | |
| } |
References
- In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.
Also swap Guava Objects.hashCode(id) for Long.hashCode(id) in ChannelPoolDpImpl.AfeId to avoid per-call boxing and array allocation.
No description provided.