Skip to content

Threading refactor#13286

Open
igorbernstein2 wants to merge 12 commits into
googleapis:mainfrom
igorbernstein2:threading-refactor
Open

Threading refactor#13286
igorbernstein2 wants to merge 12 commits into
googleapis:mainfrom
igorbernstein2:threading-refactor

Conversation

@igorbernstein2
Copy link
Copy Markdown
Contributor

No description provided.

igorbernstein2 and others added 12 commits May 27, 2026 20:25
- Add @GuardedBy("lock") to sessionParameters (was missing annotation).
- Move heartbeatInterval read in startRpc() inside synchronized(lock) to
  eliminate a data race with handleSessionParamsResponse().
- Add field-level comments on sessionListener, openParams, openParamsUpdated,
  closeReason, heartbeatInterval, and nextHeartbeat documenting why each is
  read outside the lock and what the safety argument is.
- Document the @SuppressWarnings("GuardedBy") on both SessionPoolImpl
  constructors (constructor-time writes are safe; object not yet published).
- Document the Watchdog Object lock parameter explaining it is the pool-wide
  monitor and will be given a proper type in Phase 5.
- Add ISSUES.md tracking pre-existing correctness/clarity gaps found during
  the refactor review.

No runtime behavior changes. Tests pass unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…l stores ctx

- Add abstract Executor getExecutor() to VRpcCallContext. The default from
  VRpcCallContext.create() is MoreExecutors.directExecutor(), which runs tasks
  inline — no threading change at this step.
- Thread the executor through createForNextAttempt() so retry attempts inherit
  the same serialization point as the original attempt.
- VRpcImpl.start() now stores the VRpcCallContext in this.ctx so that the
  handle*() methods can dispatch to ctx.getExecutor() in Step 4.

No runtime behavior changes. Tests pass unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…o TableBase

TableBase.readRow() and mutateRow() now create a SynchronizationContext per
call and pass it to both the RetryingVRpc constructor and VRpcCallContext.create(),
so ctx.getExecutor() returns the same object that RetryingVRpc uses for
serialization. The AtomicReference forward-reference pattern breaks the circular
dependency between the SyncContext uncaught-exception handler and the RetryingVRpc
it must cancel.

VRpcCallContext.create() gains a 4-arg overload accepting an explicit Executor;
the existing 3-arg version delegates to it with directExecutor() unchanged.

Behavior is identical — same SynchronizationContext, just created one level higher.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…to VRpcImpl

VRpcImpl.handleResponse(), handleError(), and handleSessionClose() now wrap their
listener.onClose() / listener.onMessage() calls in ctx.getExecutor().execute(),
so all callbacks are guaranteed to run inside the op SynchronizationContext.

The syncContext.execute() wrappers are removed from RetryingVRpc.Active's onMessage
and onClose — they now run directly inside the already-dispatched executor task.
Scheduled retry delay switches from syncContext.schedule(..., executor) to
executor.schedule(() -> ctx.getExecutor().execute(task), delay), using a
ScheduledFuture<?> in place of SynchronizationContext.ScheduledHandle.

SessionPoolImpl also had three direct listener.onClose() calls that bypassed
ctx.getExecutor(): the consecutive-failure close path (onSessionClose), the
pool-closed-at-start path (PendingVRpc.start), and the no-realcall cancel path
(PendingVRpc.cancel). All three are fixed to dispatch through ctx.getExecutor().
Same fix applied to VRpcImpl.start()'s inline error path.

Added ISSUE-006 documenting the silent-hang failure mode that arises when an
exception escapes an executor task and orphans the caller's future.

Test call sites in RetryingVRpcTest and VRpcTracerTest updated to pass syncContext
to VRpcCallContext.create() — without this, ctx.getExecutor() returned directExecutor
and callbacks ran on the gRPC thread, failing the throwIfNotInThisSynchronizationContext
check.

Behavior is identical. Tests pass unchanged.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Tests that call Future.get() on vRpc chains can hang indefinitely if an
exception breaks the callback dispatch chain and orphans the future (see
ISSUE-006). A class-level JUnit 5 @timeout(30) converts a silent hang into
a clear test failure within a bounded time.

Applied to: RetryingVRpcTest, VRpcTracerTest, ClientTest, TableBaseTest,
SessionImplTest, SessionPoolImplTest.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ing lock

SessionImpl gets a sessionSyncContext that serializes all session stream
callbacks (onMessage/onClose). The existing synchronized(lock) blocks remain
inside those handlers — the lock and SyncContext now coexist.

Heartbeat monitoring moves from a pool-wide scheduled task (SessionList.
checkHeartbeat) into per-session checks dispatched onto sessionSyncContext,
which is the prerequisite for removing nextHeartbeat's volatile in Step 7.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…yncContext

VRpcSessionApi.startRpc() changes from returning Status to void. SessionImpl
now submits startRpc and cancelRpc work to sessionSyncContext instead of
running it synchronously under the pool lock. Errors from startRpc are
delivered via rpc.handleError() which dispatches to ctx.getExecutor().

VRpcImpl.start() removes the synchronous error-return path — all session-level
errors now arrive asynchronously through the listener chain.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All session state mutations now happen exclusively on sessionSyncContext,
so the Object lock is no longer needed. External readers of `state` and
`lastStateChangedAt` get volatile visibility; `nextRpcId` becomes an
AtomicLong for the external `newCall()` caller. Public methods that
mutate state (start, close, forceClose) are wrapped in
sessionSyncContext.execute() so they serialize against stream callbacks.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…backExecutor)

Replace the per-op SynchronizationContext in RetryingVRpc/TableBase with a
SequentialExecutor (via MoreExecutors.newSequentialExecutor) backed by a
userCallbackExecutor. This ensures user callbacks always run on a pool
thread rather than inline on the gRPC callback thread.

RetryingVRpc now accepts a plain Executor (the sequential executor) instead
of a SynchronizationContext. TableBase separates the scheduling executor
(backgroundExecutor) from the callback serialization executor
(userCallbackExecutor); in production both point at the same thread pool,
but in tests directExecutor() gives deterministic inline execution.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Pass CallOptions.withExecutor(MoreExecutors.directExecutor()) when creating
the ClientCall for session streams in both SingleChannelPool and
ChannelPoolDpImpl. Netty I/O threads now deliver SessionStream.Listener
callbacks directly without going through the gRPC user thread pool.

All callbacks immediately trampoline through SessionImpl.sessionSyncContext
(fast, bounded bookkeeping) then dispatch user-facing work onto the
op-level SerializingExecutor and return, keeping the I/O thread free.

Added the invariant comment to SessionStream.Listener and the
sessionSyncContext field documenting this contract.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
isCancelled and realCall are now exclusively owned by ctx.getExecutor()
(the per-op SerializingExecutor), eliminating the cancel/drainTo race
that required the NOOP_CALL sentinel. cancel() eagerly removes from
pendingRpcs under the pool lock, then dispatches the state transition to
the op executor. drainTo() cancels the deadline monitor before dispatching,
and checks isCancelled on the op executor so a cancelled-before-drain rpc
returns its session cleanly via onVRpcComplete(). popClosableRpcs() and
tryDrainPendingRpcs() are simplified now that cancelled rpcs are removed
eagerly. close() copies pendingRpcs before clearing to avoid
ConcurrentModificationException when cancel() re-acquires the pool lock.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ISSUE-007 documents that PendingVRpc.monitorDeadline() and per-session
heartbeat scheduling both contribute to ScheduledExecutorService heap
churn. At ~100ms heartbeat intervals (10 fires/sec/session) and ~1ms
vRPC p50, cancelled deadline futures accumulate as zombies in the
DelayQueue, inflating O(log n) insert cost for heartbeats. Mitigations
ranked from quick fix to long-term solution.

Step 5.5 added to THREADING_REFACTOR_PLAN.md between Steps 5 and 6:
switch both heartbeat and deadline monitoring to a pool-internal
ScheduledThreadPoolExecutor with setRemoveOnCancelPolicy(true),
eliminating zombie accumulation while avoiding new dependencies.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@igorbernstein2 igorbernstein2 requested review from a team as code owners May 28, 2026 14:07
@google-cla
Copy link
Copy Markdown

google-cla Bot commented May 28, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements key phases of the threading refactor plan for the session-based transport, replacing the global session lock with a per-session SynchronizationContext and routing op-level callbacks through a sequential opExecutor. It also configures gRPC session streams to use DirectExecutor for fast, non-blocking execution on Netty I/O threads and cleans up PendingVRpc state management. The review feedback highlights two important issues: first, removing the volatile modifier from nextHeartbeat can cause infinite loops in tests due to JIT compiler hoisting; second, performing O(N) removals on the pendingRpcs queue under the global lock during cancellation can lead to severe lock contention under high load.

Duration.ofMillis(Durations.toMillis(sessionParameters.getKeepAlive()));

private volatile Instant nextHeartbeat;
private Instant nextHeartbeat;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The field nextHeartbeat was changed from volatile Instant to Instant. However, getNextHeartbeat() is a public/package-private getter that is accessed from other threads, specifically in the polling loop of SessionImplTest.java (while (!session.getNextHeartbeat().equals(expectedHeartbeat) ...). Without the volatile modifier, the JIT compiler is allowed to hoist the read of nextHeartbeat out of the loop, leading to an infinite loop/hang and causing the test to time out. To comply with the Java Memory Model and ensure thread safety and visibility, nextHeartbeat must remain volatile.

Suggested change
private Instant nextHeartbeat;
private volatile Instant nextHeartbeat;

Comment on lines 668 to +670
synchronized (SessionPoolImpl.this) {
if (isCancelled) {
return;
}
pendingRpcs.remove(this); // eager removal; no-op if already drained
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In cancel(Status, boolean), pendingRpcs.remove(this) is called under the global SessionPoolImpl.this lock. Since pendingRpcs is an ArrayDeque, the remove(Object) operation has O(N) time complexity as it requires a linear scan of the queue and shifting elements. Under high load or transient queuing (where queue depth can reach tens of thousands of entries as noted in ISSUES.md), this linear scan under a global lock can cause severe lock contention and CPU overhead. Additionally, in performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility. Consider either reverting to the previous lazy removal strategy (where cancelled RPCs are lazily popped from the queue during drain, which is O(1)), or changing pendingRpcs to a data structure that supports O(1) arbitrary removal while preserving FIFO order, such as a LinkedHashSet (or a set backed by LinkedHashMap), and protecting it with an explicit lock.

References
  1. In performance-sensitive code, prefer using explicit locks over the 'synchronized' keyword to protect shared state while ensuring thread safety and visibility.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This review comment seems reasonable?

Executor opExecutor = MoreExecutors.newSequentialExecutor(userCallbackExecutor);
RetryingVRpc<SessionReadRowRequest, SessionReadRowResponse> retry =
new RetryingVRpc<>(() -> sessionPool.newCall(readRowDescriptor), backgroundExecutor);
new RetryingVRpc<>(() -> sessionPool.newCall(readRowDescriptor), backgroundExecutor, opExecutor);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we injecting the opExecutor in the middle of middleware chain? I think it needs to be inject at the top of the chain. Maybe we need a middleware to create the VRpcContext

sessionPool.start(openReq, new Metadata());

return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor);
return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor, executor);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Background executor and the userCallbackExecutor should not be the same thing. The userCallbackExecutor should default to a CachedThreadPool, and we should respect the user's override from the compat layer in gax's InstantiatingGrpcChannelProvider

@Override
public void start(ReqT req, VRpcCallContext ctx, VRpcListener<RespT> listener) {
syncContext.execute(
opExecutor.execute(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved out to the start of the middleware chain?

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
syncContext.execute(
opExecutor.execute(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, should this move out to the start of the middleware chain?

.build();

private final Clock clock;
private final ScheduledExecutorService scheduledExecutor;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was previously implemented as a single task per pool. Now its per vRpc.
Please a TODO to use a HashedWheelTimer to minimize the overhead of the change

Level.WARNING,
String.format(
"Unhandled exception in session SynchronizationContext for %s",
info.getLogName()),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need better error handling...if something throw an exception, then it will leave the session in a bad state. I think unhandled exceptions need to cancel the session as a whole

forceClose(MISSED_HEARTBEAT_CLOSE_REQUEST);
return;
}
scheduleHeartbeatCheck();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the heartbeats moved per session, do we still need to track clock.instant().isAfter(nextHeartbeat)? shouldnt it just use the task just be scheduled for the next check point?

SessionImpl.HEARTBEAT_CHECK_INTERVAL.toMillis(),
SessionImpl.HEARTBEAT_CHECK_INTERVAL.toMillis(),
TimeUnit.MILLISECONDS);
// Heartbeat monitoring is now done per-session via SessionImpl.sessionSyncContext.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment doesnt make sense outside of this pr

sessionPool.start(openReq, new Metadata());

return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor);
return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor, executor);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? I don't think we want to use the same executor for user callbacks and background tasks?

ClientCall<SessionRequest, SessionResponse> innerCall =
channelWrapper.channel.newCall(desc, callOptions);
channelWrapper.channel.newCall(
desc, callOptions.withExecutor(MoreExecutors.directExecutor()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use directExecutor?

Avoid directExecutor() for Blocking Tasks: Using directExecutor() runs the RPC handler on the same thread that handles network I/O. While this can improve performance by reducing context switching, it is dangerous if your code is not 100% non-blocking; any delay will block the network event loop and halt communication for all other connections.

public RetryingVRpc(Supplier<VRpc<ReqT, RespT>> supplier, ScheduledExecutorService executor) {
public RetryingVRpc(
Supplier<VRpc<ReqT, RespT>> supplier,
ScheduledExecutorService scheduledExecutor,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

Suggested change
ScheduledExecutorService scheduledExecutor,
ScheduledExecutorService backgroundExecutor,


public static VRpcCallContext create(
Deadline deadline, boolean isIdempotent, VRpcTracer tracer) {
return create(deadline, isIdempotent, tracer, MoreExecutors.directExecutor());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove this method? or should we pass in MoreExecutors.newSequentialExecutor(directExectuor)?

this.sessionSyncContext =
new SynchronizationContext(
(thread, e) ->
logger.log(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to close the stream when this happens?

updateState(SessionState.READY);
tracer.onOpen(localPeerInfo);
sessionListener.onReady(openSession);
scheduleHeartbeatCheck();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heartbeat check should only be scheduled on in use sessions. So this probably should move to startRpc, and cancel in handleSessionResponse.

VRpcResult.createUncommitedError(
Status.UNAVAILABLE.withCause(
new IllegalStateException("SessionPool is closed"))));
ctx.getExecutor()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use executor for vrpcs but syncContext in sessions ? what's the difference ?

Comment on lines 668 to +670
synchronized (SessionPoolImpl.this) {
if (isCancelled) {
return;
}
pendingRpcs.remove(this); // eager removal; no-op if already drained
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This review comment seems reasonable?

// TODO: fix lock sharing
// The `lock` parameter is the pool-wide monitor (SessionPoolImpl.this). It is typed as Object
// because Watchdog is a static nested class and cannot reference the outer instance type in its
// constructor signature without creating a circular dependency. Phase 5 will replace this with
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the comment about "phase 5".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants