Threading refactor#13286
Conversation
- Add @GuardedBy("lock") to sessionParameters (was missing annotation). - Move heartbeatInterval read in startRpc() inside synchronized(lock) to eliminate a data race with handleSessionParamsResponse(). - Add field-level comments on sessionListener, openParams, openParamsUpdated, closeReason, heartbeatInterval, and nextHeartbeat documenting why each is read outside the lock and what the safety argument is. - Document the @SuppressWarnings("GuardedBy") on both SessionPoolImpl constructors (constructor-time writes are safe; object not yet published). - Document the Watchdog Object lock parameter explaining it is the pool-wide monitor and will be given a proper type in Phase 5. - Add ISSUES.md tracking pre-existing correctness/clarity gaps found during the refactor review. No runtime behavior changes. Tests pass unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…l stores ctx - Add abstract Executor getExecutor() to VRpcCallContext. The default from VRpcCallContext.create() is MoreExecutors.directExecutor(), which runs tasks inline — no threading change at this step. - Thread the executor through createForNextAttempt() so retry attempts inherit the same serialization point as the original attempt. - VRpcImpl.start() now stores the VRpcCallContext in this.ctx so that the handle*() methods can dispatch to ctx.getExecutor() in Step 4. No runtime behavior changes. Tests pass unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…o TableBase TableBase.readRow() and mutateRow() now create a SynchronizationContext per call and pass it to both the RetryingVRpc constructor and VRpcCallContext.create(), so ctx.getExecutor() returns the same object that RetryingVRpc uses for serialization. The AtomicReference forward-reference pattern breaks the circular dependency between the SyncContext uncaught-exception handler and the RetryingVRpc it must cancel. VRpcCallContext.create() gains a 4-arg overload accepting an explicit Executor; the existing 3-arg version delegates to it with directExecutor() unchanged. Behavior is identical — same SynchronizationContext, just created one level higher. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…to VRpcImpl VRpcImpl.handleResponse(), handleError(), and handleSessionClose() now wrap their listener.onClose() / listener.onMessage() calls in ctx.getExecutor().execute(), so all callbacks are guaranteed to run inside the op SynchronizationContext. The syncContext.execute() wrappers are removed from RetryingVRpc.Active's onMessage and onClose — they now run directly inside the already-dispatched executor task. Scheduled retry delay switches from syncContext.schedule(..., executor) to executor.schedule(() -> ctx.getExecutor().execute(task), delay), using a ScheduledFuture<?> in place of SynchronizationContext.ScheduledHandle. SessionPoolImpl also had three direct listener.onClose() calls that bypassed ctx.getExecutor(): the consecutive-failure close path (onSessionClose), the pool-closed-at-start path (PendingVRpc.start), and the no-realcall cancel path (PendingVRpc.cancel). All three are fixed to dispatch through ctx.getExecutor(). Same fix applied to VRpcImpl.start()'s inline error path. Added ISSUE-006 documenting the silent-hang failure mode that arises when an exception escapes an executor task and orphans the caller's future. Test call sites in RetryingVRpcTest and VRpcTracerTest updated to pass syncContext to VRpcCallContext.create() — without this, ctx.getExecutor() returned directExecutor and callbacks ran on the gRPC thread, failing the throwIfNotInThisSynchronizationContext check. Behavior is identical. Tests pass unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Tests that call Future.get() on vRpc chains can hang indefinitely if an exception breaks the callback dispatch chain and orphans the future (see ISSUE-006). A class-level JUnit 5 @timeout(30) converts a silent hang into a clear test failure within a bounded time. Applied to: RetryingVRpcTest, VRpcTracerTest, ClientTest, TableBaseTest, SessionImplTest, SessionPoolImplTest. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ing lock SessionImpl gets a sessionSyncContext that serializes all session stream callbacks (onMessage/onClose). The existing synchronized(lock) blocks remain inside those handlers — the lock and SyncContext now coexist. Heartbeat monitoring moves from a pool-wide scheduled task (SessionList. checkHeartbeat) into per-session checks dispatched onto sessionSyncContext, which is the prerequisite for removing nextHeartbeat's volatile in Step 7. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…yncContext VRpcSessionApi.startRpc() changes from returning Status to void. SessionImpl now submits startRpc and cancelRpc work to sessionSyncContext instead of running it synchronously under the pool lock. Errors from startRpc are delivered via rpc.handleError() which dispatches to ctx.getExecutor(). VRpcImpl.start() removes the synchronous error-return path — all session-level errors now arrive asynchronously through the listener chain. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All session state mutations now happen exclusively on sessionSyncContext, so the Object lock is no longer needed. External readers of `state` and `lastStateChangedAt` get volatile visibility; `nextRpcId` becomes an AtomicLong for the external `newCall()` caller. Public methods that mutate state (start, close, forceClose) are wrapped in sessionSyncContext.execute() so they serialize against stream callbacks. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…backExecutor) Replace the per-op SynchronizationContext in RetryingVRpc/TableBase with a SequentialExecutor (via MoreExecutors.newSequentialExecutor) backed by a userCallbackExecutor. This ensures user callbacks always run on a pool thread rather than inline on the gRPC callback thread. RetryingVRpc now accepts a plain Executor (the sequential executor) instead of a SynchronizationContext. TableBase separates the scheduling executor (backgroundExecutor) from the callback serialization executor (userCallbackExecutor); in production both point at the same thread pool, but in tests directExecutor() gives deterministic inline execution. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Pass CallOptions.withExecutor(MoreExecutors.directExecutor()) when creating the ClientCall for session streams in both SingleChannelPool and ChannelPoolDpImpl. Netty I/O threads now deliver SessionStream.Listener callbacks directly without going through the gRPC user thread pool. All callbacks immediately trampoline through SessionImpl.sessionSyncContext (fast, bounded bookkeeping) then dispatch user-facing work onto the op-level SerializingExecutor and return, keeping the I/O thread free. Added the invariant comment to SessionStream.Listener and the sessionSyncContext field documenting this contract. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
isCancelled and realCall are now exclusively owned by ctx.getExecutor() (the per-op SerializingExecutor), eliminating the cancel/drainTo race that required the NOOP_CALL sentinel. cancel() eagerly removes from pendingRpcs under the pool lock, then dispatches the state transition to the op executor. drainTo() cancels the deadline monitor before dispatching, and checks isCancelled on the op executor so a cancelled-before-drain rpc returns its session cleanly via onVRpcComplete(). popClosableRpcs() and tryDrainPendingRpcs() are simplified now that cancelled rpcs are removed eagerly. close() copies pendingRpcs before clearing to avoid ConcurrentModificationException when cancel() re-acquires the pool lock. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ISSUE-007 documents that PendingVRpc.monitorDeadline() and per-session heartbeat scheduling both contribute to ScheduledExecutorService heap churn. At ~100ms heartbeat intervals (10 fires/sec/session) and ~1ms vRPC p50, cancelled deadline futures accumulate as zombies in the DelayQueue, inflating O(log n) insert cost for heartbeats. Mitigations ranked from quick fix to long-term solution. Step 5.5 added to THREADING_REFACTOR_PLAN.md between Steps 5 and 6: switch both heartbeat and deadline monitoring to a pool-internal ScheduledThreadPoolExecutor with setRemoveOnCancelPolicy(true), eliminating zombie accumulation while avoiding new dependencies. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
There was a problem hiding this comment.
Code Review
This pull request implements key phases of the threading refactor plan for the session-based transport, replacing the global session lock with a per-session SynchronizationContext and routing op-level callbacks through a sequential opExecutor. It also configures gRPC session streams to use DirectExecutor for fast, non-blocking execution on Netty I/O threads and cleans up PendingVRpc state management. The review feedback highlights two important issues: first, removing the volatile modifier from nextHeartbeat can cause infinite loops in tests due to JIT compiler hoisting; second, performing O(N) removals on the pendingRpcs queue under the global lock during cancellation can lead to severe lock contention under high load.
| Duration.ofMillis(Durations.toMillis(sessionParameters.getKeepAlive())); | ||
|
|
||
| private volatile Instant nextHeartbeat; | ||
| private Instant nextHeartbeat; |
There was a problem hiding this comment.
The field nextHeartbeat was changed from volatile Instant to Instant. However, getNextHeartbeat() is a public/package-private getter that is accessed from other threads, specifically in the polling loop of SessionImplTest.java (while (!session.getNextHeartbeat().equals(expectedHeartbeat) ...). Without the volatile modifier, the JIT compiler is allowed to hoist the read of nextHeartbeat out of the loop, leading to an infinite loop/hang and causing the test to time out. To comply with the Java Memory Model and ensure thread safety and visibility, nextHeartbeat must remain volatile.
| private Instant nextHeartbeat; | |
| private volatile Instant nextHeartbeat; |
| synchronized (SessionPoolImpl.this) { | ||
| if (isCancelled) { | ||
| return; | ||
| } | ||
| pendingRpcs.remove(this); // eager removal; no-op if already drained | ||
| } |
There was a problem hiding this comment.
In cancel(Status, boolean), pendingRpcs.remove(this) is called under the global SessionPoolImpl.this lock. Since pendingRpcs is an ArrayDeque, the remove(Object) operation has O(N) time complexity as it requires a linear scan of the queue and shifting elements. Under high load or transient queuing (where queue depth can reach tens of thousands of entries as noted in ISSUES.md), this linear scan under a global lock can cause severe lock contention and CPU overhead. Additionally, in performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility. Consider either reverting to the previous lazy removal strategy (where cancelled RPCs are lazily popped from the queue during drain, which is O(1)), or changing pendingRpcs to a data structure that supports O(1) arbitrary removal while preserving FIFO order, such as a LinkedHashSet (or a set backed by LinkedHashMap), and protecting it with an explicit lock.
References
- In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.
There was a problem hiding this comment.
This review comment seems reasonable?
| Executor opExecutor = MoreExecutors.newSequentialExecutor(userCallbackExecutor); | ||
| RetryingVRpc<SessionReadRowRequest, SessionReadRowResponse> retry = | ||
| new RetryingVRpc<>(() -> sessionPool.newCall(readRowDescriptor), backgroundExecutor); | ||
| new RetryingVRpc<>(() -> sessionPool.newCall(readRowDescriptor), backgroundExecutor, opExecutor); |
There was a problem hiding this comment.
why are we injecting the opExecutor in the middle of middleware chain? I think it needs to be inject at the top of the chain. Maybe we need a middleware to create the VRpcContext
| sessionPool.start(openReq, new Metadata()); | ||
|
|
||
| return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor); | ||
| return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor, executor); |
There was a problem hiding this comment.
Background executor and the userCallbackExecutor should not be the same thing. The userCallbackExecutor should default to a CachedThreadPool, and we should respect the user's override from the compat layer in gax's InstantiatingGrpcChannelProvider
| @Override | ||
| public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) { | ||
| syncContext.execute( | ||
| opExecutor.execute( |
There was a problem hiding this comment.
Should this be moved out to the start of the middleware chain?
| @Override | ||
| public void cancel(@Nullable String message, @Nullable Throwable cause) { | ||
| syncContext.execute( | ||
| opExecutor.execute( |
There was a problem hiding this comment.
same as above, should this move out to the start of the middleware chain?
| .build(); | ||
|
|
||
| private final Clock clock; | ||
| private final ScheduledExecutorService scheduledExecutor; |
There was a problem hiding this comment.
This was previously implemented as a single task per pool. Now its per vRpc.
Please a TODO to use a HashedWheelTimer to minimize the overhead of the change
| Level.WARNING, | ||
| String.format( | ||
| "Unhandled exception in session SynchronizationContext for %s", | ||
| info.getLogName()), |
There was a problem hiding this comment.
We need better error handling...if something throw an exception, then it will leave the session in a bad state. I think unhandled exceptions need to cancel the session as a whole
| forceClose(MISSED_HEARTBEAT_CLOSE_REQUEST); | ||
| return; | ||
| } | ||
| scheduleHeartbeatCheck(); |
There was a problem hiding this comment.
since the heartbeats moved per session, do we still need to track clock.instant().isAfter(nextHeartbeat)? shouldnt it just use the task just be scheduled for the next check point?
| SessionImpl.HEARTBEAT_CHECK_INTERVAL.toMillis(), | ||
| SessionImpl.HEARTBEAT_CHECK_INTERVAL.toMillis(), | ||
| TimeUnit.MILLISECONDS); | ||
| // Heartbeat monitoring is now done per-session via SessionImpl.sessionSyncContext. |
There was a problem hiding this comment.
this comment doesnt make sense outside of this pr
| sessionPool.start(openReq, new Metadata()); | ||
|
|
||
| return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor); | ||
| return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor, executor); |
There was a problem hiding this comment.
Is this right? I don't think we want to use the same executor for user callbacks and background tasks?
| ClientCall<SessionRequest, SessionResponse> innerCall = | ||
| channelWrapper.channel.newCall(desc, callOptions); | ||
| channelWrapper.channel.newCall( | ||
| desc, callOptions.withExecutor(MoreExecutors.directExecutor())); |
There was a problem hiding this comment.
Can we use directExecutor?
Avoid directExecutor() for Blocking Tasks: Using directExecutor() runs the RPC handler on the same thread that handles network I/O. While this can improve performance by reducing context switching, it is dangerous if your code is not 100% non-blocking; any delay will block the network event loop and halt communication for all other connections.
| public RetryingVRpc(Supplier<VRpc<ReqT, RespT>> supplier, ScheduledExecutorService executor) { | ||
| public RetryingVRpc( | ||
| Supplier<VRpc<ReqT, RespT>> supplier, | ||
| ScheduledExecutorService scheduledExecutor, |
There was a problem hiding this comment.
maybe
| ScheduledExecutorService scheduledExecutor, | |
| ScheduledExecutorService backgroundExecutor, |
|
|
||
| public static VRpcCallContext create( | ||
| Deadline deadline, boolean isIdempotent, VRpcTracer tracer) { | ||
| return create(deadline, isIdempotent, tracer, MoreExecutors.directExecutor()); |
There was a problem hiding this comment.
should we remove this method? or should we pass in MoreExecutors.newSequentialExecutor(directExectuor)?
| this.sessionSyncContext = | ||
| new SynchronizationContext( | ||
| (thread, e) -> | ||
| logger.log( |
There was a problem hiding this comment.
do we need to close the stream when this happens?
| updateState(SessionState.READY); | ||
| tracer.onOpen(localPeerInfo); | ||
| sessionListener.onReady(openSession); | ||
| scheduleHeartbeatCheck(); |
There was a problem hiding this comment.
heartbeat check should only be scheduled on in use sessions. So this probably should move to startRpc, and cancel in handleSessionResponse.
| VRpcResult.createUncommitedError( | ||
| Status.UNAVAILABLE.withCause( | ||
| new IllegalStateException("SessionPool is closed")))); | ||
| ctx.getExecutor() |
There was a problem hiding this comment.
why do we use executor for vrpcs but syncContext in sessions ? what's the difference ?
| synchronized (SessionPoolImpl.this) { | ||
| if (isCancelled) { | ||
| return; | ||
| } | ||
| pendingRpcs.remove(this); // eager removal; no-op if already drained | ||
| } |
There was a problem hiding this comment.
This review comment seems reasonable?
| // TODO: fix lock sharing | ||
| // The `lock` parameter is the pool-wide monitor (SessionPoolImpl.this). It is typed as Object | ||
| // because Watchdog is a static nested class and cannot reference the outer instance type in its | ||
| // constructor signature without creating a circular dependency. Phase 5 will replace this with |
There was a problem hiding this comment.
I think we can remove the comment about "phase 5".
No description provided.