diff --git a/java-bigtable/ISSUES.md b/java-bigtable/ISSUES.md new file mode 100644 index 000000000000..a704ce93239c --- /dev/null +++ b/java-bigtable/ISSUES.md @@ -0,0 +1,157 @@ +# Pre-existing issues found during threading refactor + +These are correctness concerns or suboptimal patterns noticed while reading the code. +They are not introduced by the refactor. + +--- + +## SessionImpl + +### ISSUE-001: `@Nullable closeReason` is misleading — field is never null at the one external read site + +`closeReason` is declared `@Nullable` (line 103 before Step 1) but `dispatchStreamClosed` calls +`closeReason.getReason()` after the synchronized block with no null guard. The call is safe +because every code path that exits `dispatchStreamClosed`'s synchronized block without an early +return guarantees `closeReason` is non-null: +- `state == WAIT_SERVER_CLOSE`: `closeReason` was set before the state was entered. +- `state == CLOSED`: returns early inside the block. +- Any other state: set inside the block by the abnormal-close error path. + +However, the `@Nullable` annotation makes the invariant non-obvious and means static analysers +(and future readers) cannot rely on the annotation. Either remove `@Nullable` and add a comment +asserting that it is always set before the closing state, or change the field type to carry a +sentinel value instead of null. + +**File:** `session/SessionImpl.java` + +--- + +### ISSUE-002: `sessionParameters` was missing `@GuardedBy("lock")` annotation + +The field `sessionParameters` is read and written only under `lock`, but had no `@GuardedBy` +annotation. Fixed in Step 1. + +--- + +## RetryingVRpc + +### ISSUE-003: `requestNext()` is unimplemented — throws `UnsupportedOperationException` + +`RetryingVRpc.requestNext()` always throws `UnsupportedOperationException`. The streaming retry +path is not implemented. This means any code path that calls `requestNext()` after `onMessage` +will crash. Tracked for future work (server-streaming vRPC support). + +**File:** `middleware/RetryingVRpc.java` + +--- + +## SessionPoolImpl / Watchdog + +### ISSUE-004: `Watchdog.lock` typed as `Object` instead of the actual outer lock + +The `Watchdog` static inner class accepts the pool-wide monitor as `Object lock`. This prevents +static analysis tools from verifying the locking discipline and makes the contract implicit. +Will be addressed in Phase 5 when the per-AFE lock sharding redesign gives the locking model +an explicit, typed structure. + +**File:** `session/SessionPoolImpl.java` — `Watchdog` inner class + +--- + +## VRpcImpl + +### ISSUE-005: `cancelRpc` is silent when the current RPC does not match + +In `SessionImpl.cancelRpc()`, if `currentRpc != null` but `rpcId != currentRpc.rpcId`, the +cancel is silently dropped. This is the expected behavior for a late cancel (RPC already +finished), but there is no logging or tracing event, making silent drops invisible in metrics. +Consider adding a debug-level counter or log. + +**File:** `session/SessionImpl.java` — `cancelRpc()` + +--- + +## VRpcListener / UnaryResponseFuture + +### ISSUE-006: An exception thrown inside the op executor dispatch chain can leave the caller's future permanently unresolved (silent hang) + +**Observed during Step 4 of the threading refactor.** When `VRpcImpl` dispatches a callback +via `ctx.getExecutor().execute(...)`, any uncaught exception that escapes from within that task +can silently orphan the caller's `VRpcListener` (and any `Future` backed by it): + +1. `VRpcImpl.handleError()` calls `ctx.getExecutor().execute(() -> listener.onClose(result))`. +2. If `ctx.getExecutor()` is a `SynchronizationContext` and the listener throws (e.g. + `throwIfNotInThisSynchronizationContext()` on the wrong thread), the SyncContext's + uncaught-exception handler fires. +3. The handler calls `retrying.cancel(...)`, which submits a new task to the same + SynchronizationContext. +4. That cancel task eventually reaches `RetryingVRpc.Done.onStart()`, which calls + `listener.onClose(CANCELLED)` — but only if the SyncContext drain loop is still healthy. +5. If the SyncContext drain exited early or the downstream cancel itself throws, `listener.onClose` + is never called, so `f.get()` blocks forever — there is no timeout, no error, no log. + +**Symptoms:** test/operation hangs indefinitely on `Future.get()` after an exception in the +callback chain. + +**Root cause class:** There is no invariant enforcement that every code path that may fail +*must* call `listener.onClose()` exactly once. Exceptions that escape executor tasks bypass +the normal close path. + +**Desired fix (post-refactor):** After the threading refactor is complete, audit every executor +`execute()` task that can invoke a `VRpcListener` to ensure a try/catch wraps the body and +calls `listener.onClose(createLocalTransportError(...))` in the catch branch. The +`SynchronizationContext` uncaught-exception handler should be a last-resort log + metric, not +a recovery mechanism. Consider a typed `VRpcTask` wrapper that enforces the "always close" +contract at compile time. + +**Files:** `session/VRpcImpl.java`, `middleware/RetryingVRpc.java` + +--- + +## SessionPoolImpl — PendingVRpc + +### ISSUE-007: `PendingVRpc.monitorDeadline()` causes `ScheduledExecutorService` heap churn under load + +Every RPC that cannot immediately acquire a session goes through `PendingVRpc` and schedules a +deadline-monitoring `ScheduledFuture` on the pool's `ScheduledExecutorService`. Under normal +conditions the future is cancelled almost immediately — sessions are expected to be available +within ~1 ms at p50. `ScheduledFuture.cancel(false)` marks the future cancelled but does **not** +remove it from the underlying `DelayQueue`. Cancelled futures remain in the heap until their +deadline expires naturally (typically seconds to minutes), inflating the queue and increasing +O(log n) insert/remove cost for every subsequent schedule operation. + +**Why this matters at the given operating point:** + +Both `PendingVRpc.monitorDeadline()` and per-session heartbeat scheduling hit the same +`ScheduledExecutorService`, and their effects compound. + +**Heartbeat pressure:** At ~100 ms per heartbeat, each session fires 10 `schedule()` calls/sec. +Heartbeat tasks run and reschedule — they do not cancel, so they produce no zombies — but they +sustain O(log n) heap churn at 10N ops/sec for N sessions. This is not a low frequency in +context: at a vRPC p50 of ~1 ms, a heartbeat fires every ~100 vRPCs, meaning background +scheduling overhead is in the same order of magnitude as per-RPC work. + +**Deadline monitor zombie accumulation:** `cancel(false)` marks a future cancelled but does not +remove it from the `DelayQueue`. At ~1 ms p50 session-wait, a deadline future (say, 60 s) is +created and cancelled almost immediately. At 10 000 RPC/s with 10 % transiently pending: +1 000 futures/sec added, each living ~60 s → steady-state zombie count of ~60 000 entries. + +**Compounding:** Heartbeat inserts pay O(log n) against a queue inflated by zombie deadline +futures. With 10 sessions and 60 000 zombies, each heartbeat insert costs O(log 60 010) ≈ 16 +comparisons instead of O(log 10) ≈ 3. The absolute cost is small today but grows linearly with +both session count and RPC throughput. + +**Mitigations (in increasing order of impact):** + +1. **Short-circuit**: if the deadline is already expired when `PendingVRpc.start()` is called, + reject immediately without scheduling a future. +2. **`setRemoveOnCancelPolicy(true)`** on the `ScheduledThreadPoolExecutor`: removes cancelled + tasks from the queue eagerly in O(log n). Eliminates zombie accumulation. Requires + controlling the executor construction. +3. **Hashed wheel timer** (e.g., Netty's `HashedWheelTimer`): O(1) insert and O(1) cancel with + no zombie accumulation. Neither deadline monitoring nor heartbeat checking requires + sub-millisecond precision; a wheel tick of ~10 ms is appropriate for both. This is the right + long-term fix for both sources of churn. + +**Files:** `session/SessionPoolImpl.java` — `PendingVRpc.monitorDeadline()` and +`session/SessionImpl.java` — `scheduleHeartbeatCheck()` diff --git a/java-bigtable/SESSION_TRANSPORT_NOTES.md b/java-bigtable/SESSION_TRANSPORT_NOTES.md new file mode 100644 index 000000000000..8905647e2d65 --- /dev/null +++ b/java-bigtable/SESSION_TRANSPORT_NOTES.md @@ -0,0 +1,377 @@ +# Session-Based Transport — Code Map + +Reference notes for `google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/`. +A new transport layered next to the existing gax-based stack. The "session" is a long-lived bidi gRPC +stream that carries many virtual RPCs (vRPCs), amortizing per-stream setup (TLS, auth, AFE routing) +across many short ops like `ReadRow` / `MutateRow`. + +Root path (omitted from file refs below): +`google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/` + +## Big picture + +``` +gax UnaryCallable surface (existing public API) + └── compat/ops/DivertingUnaryCallable ← routes % of calls to new stack via ClientConfigurationManager + └── compat/ops/ReadRowShim, MutateRowShim, ReadRowShimInner ← per-op handlers + └── api/TableAsync · AuthorizedViewAsync · MaterializedViewAsync ← thin facade + └── api/TableBase.readRow/mutateRow + ↓ wraps in + middleware/CancellableVRpc + └── middleware/RetryingVRpc + └── session/VRpcImpl ← transport-level VRpc + ↑ created by + session/SessionImpl.newCall(VRpcDescriptor) + ↑ chosen by + session/SessionPoolImpl ← uses session/DynamicPicker + ↑ runs on + channels/SessionStream (= one bidi gRPC ClientCall) + ↑ from + channels/ChannelPool (Single | Switching | Fallback | DpImpl) + ↑ built from + api/ChannelProviders (CloudPath | DirectAccess | …) +``` + +`dp/` decides DirectPath eligibility once at shim startup. `csm/` provides VRpc/session tracers +plugged in at `TableBase` and `SessionImpl`. + +--- + +## api/ — public-facing entry point of the new transport + +- **Client.java** — top-level entry. `Client.create(ClientSettings)` builds creds, `Metrics`, starts + `ClientConfigurationManager` (fetches initial server config), builds and starts a + `SwitchingChannelPool`. `openTableAsync()` / `openAuthorizedViewAsync()` / `openMaterializedViewAsync()` + create + start a `SessionPoolImpl` via the corresponding `*Async`. `close()` tears down pools, metrics, + channel pool, config manager, executor — in that order. +- **ClientSettings.java** — AutoValue: `ChannelProvider`, `InstanceName`, `appProfileId`. Default + provider is `DirectAccess.withFallback(DEFAULT_HOST)`. +- **ChannelProviders.java** — factories: `CloudPath` (TLS + call creds), `DirectAccess` + (`google-c2p:///` URI + `GoogleDefaultChannelCredentials` + installs `DirectPathIpInterceptor`), + `RawDirectPath` (ALTS to hardcoded IPs), `EmulatorChannelProvider` (plaintext). Each exposes + `getFallback()` (non-empty only for `DirectAccess`) and `updateFeatureFlags()` so the + `OpenSessionRequest` advertises `trafficDirectorEnabled`/`directAccessRequested` to the server. + `ConfiguredChannelProvider` decorates with channel-level settings (keepalive, metric interceptors). +- **TableBase.java** — shared base for the three `*Async`. `createAndStart()` builds a + `SessionPoolImpl` for the given `SessionDescriptor` + open-request proto. `readRow`/`mutateRow` wrap + `pool.newCall(VRpcDescriptor)` in `RetryingVRpc` → `CancellableVRpc`, attach a `VRpcTracer` via + `Metrics.newTableTracer`, build `VRpcCallContext` (deadline, idempotency), then start. +- **TableAsync.java / AuthorizedViewAsync.java / MaterializedViewAsync.java** — per-resource facades. + Build the resource-specific `OpenXRequest`, pick the matching `VRpcDescriptor` constants. Expose + `CompletableFuture<…> readRow(req, deadline)` / `mutateRow(...)` by creating a `UnaryResponseFuture` + and handing it to `TableBase`. Materialized view has no `mutateRow` (read-only). +- **UnaryResponseFuture.java** — `CompletableFuture` + `VRpcListener`. Buffers one message, + completes on close (OK → value, error → `VRpcException`). +- **VRpcException.java** — `StatusRuntimeException` carrying the full `VRpcResult` (so retry callers + can see committed vs uncommitted). +- **InstanceName / TableName / AuthorizedViewName / MaterializedViewName** — AutoValue resource-name + helpers with `toString()` / `parse(String)`. +- **Util.java** — `composeMetadata()` builds session-open headers (`bigtable-features`, + `x-goog-request-params`); `isIdempotent()` inspects mutation list for non-idempotent ops + (aggregates, server timestamps). + +--- + +## session/ — session lifecycle, pool, pickers, transport-level vRPC + +A `Session` = one bidi gRPC stream of `SessionRequest`/`SessionResponse`. Many sequential vRPCs +multiplex onto it (one-at-a-time today; multi-vRPC noted as TODO). + +States: `NEW → STARTING → READY → CLOSING → WAIT_SERVER_CLOSE → CLOSED`. + +### Session core +- **Session.java** — interface: `start(openReq, md, listener)`, `newCall(VRpcDescriptor)`, + `close()`, `forceClose()`. `Listener` exposes `onReady` / `onGoAway` / `onClose`. +- **SessionImpl.java** — concrete. Wraps a `SessionStream` + a `SessionList.SessionHandle`. Wraps + open request in `SessionRequest` envelope; `dispatchResponseMessage()` routes inbound + `SessionResponse`: `OPEN_SESSION` → READY + notify pool; `SESSION_PARAMETERS` → heartbeat interval; + `VIRTUAL_RPC`/`ERROR` → active `VRpcImpl`; `GO_AWAY` → graceful close; `HEARTBEAT` → reset deadline; + `SESSION_REFRESH_CONFIG` → renegotiate `openParams`. `startRpc()` sends a `VirtualRpcRequest` for + the owning `VRpcImpl`. Close is two-phase: CLOSING → wait for current vRPC → `startGracefulClose()` + (send `CloseSessionRequest`, half-close). Watchdog force-closes if stuck in `WAIT_SERVER_CLOSE`. +- **SessionFactory.java** — thin: calls `channelPool.newStream(method, callOpts)`. Decouples pool + from channel layer. +- **SessionInfo.java** — AutoValue: pool ref + log name `"-"`. +- **SessionList.java** — bookkeeping (caller holds the `SessionPoolImpl` lock). Groups sessions by + AFE via `AfeHandle` (keyed on `PeerInfo.applicationFrontendId`); per-AFE idle queue + `PeakEwma` + latency. `SessionHandle` lifecycle: `onSessionStarted` (start→AFE queue), `checkoutSession`, + `onVRpcFinish` (returns + updates latency), `onSessionClosing` (GoAway), `onSessionClosed`. Also + holds `PoolStats` (starting/ready/inUse/expectedCapacity) for `PoolSizer`. +- **SessionPool.java** — interface typed on the open-request proto. +- **SessionPoolImpl.java** — orchestrator. `start()` builds `OpenParams`, pre-creates sessions via + `PoolSizer.getScaleDelta()`, starts Watchdog + heartbeat scheduler. `newCall()` asks + `DynamicPicker`; if hit → `ForwardingVRpc` that calls `onVRpcComplete()` on close (returns session, + drains pending). On miss → `PendingVRpc` parked in a deque, with deadline monitoring → + `DEADLINE_EXCEEDED`. Session-ready event triggers `tryDrainPendingRpcs()`. GoAway triggers + `PoolSizer.handleGoAway()` to decide replacement. Tracks `consecutiveUnimplementedFailures` — + consumed by the shim layer to fall back to classic. +- **SessionPoolInfo.java** — AutoValue: `ClientInfo`, `SessionType`, name. +- **SessionUtil.java** — extracts `SessionType` from proto method/message options. +- **SessionCreationBudget.java** — token bucket throttling concurrent session-creates. Failures add + time-delayed re-grant; live-updated from `ClientConfigurationManager`. + +### Pickers +- **Picker.java** — abstract: `Optional pickSession()`. +- **SimplePicker.java** — uniform-random AFE → dequeue one session. +- **DynamicPicker.java** — wraps a concrete picker; swaps per `LoadBalancingOptions`. RANDOM → + Simple; LEAST_IN_FLIGHT → `LeastInFlightPicker`; PEAK_EWMA → `LeastLatencyPicker`; default → + `LeastInFlightPicker`. +- **LeastInFlightPicker.java** — partial Fisher-Yates over `randomSubsetSize`, picks AFE with + fewest outstanding (= refCount − idle queue size). +- **LeastLatencyPicker.java** — same shape, scores by `AfeHandle.getE2eCost()` (peak-EWMA latency). + Experimental. +- **PoolSizer.java** — desired count = in-use + headroom-fraction(in-use), clamped to + `[minIdleSessions, maxIdleSessions]`. `handleGoAway`/`handleSessionClose`/`handleNewCall` return + booleans for replacement. + +### vRPC at the transport seam +- **VRpcDescriptor.java** — static registry of all vRPC types. Each carries: `SessionDescriptor` + (session family — `SessionType`, `MethodDescriptor`, + header-params extractor for RLS, log-name extractor), `MethodInfo`, `Encoder`/`Decoder` (typed req + ↔ payload bytes), `LegacyConverter` (to classic proto). Constants: `READ_ROW`, `MUTATE_ROW`, + `READ_ROW_AUTH_VIEW`, `MUTATE_ROW_AUTH_VIEW`, `READ_ROW_MAT_VIEW`. +- **VRpcImpl.java** — transport-level `VRpc`. Created by `SessionImpl.newCall`. `start()` encodes + typed request → `VirtualRpcRequest` (rpc_id, deadline, traceparent, payload) → `session.startRpc`. + `handleResponse`/`handleError` decode and fire `VRpcListener`. `handleSessionClose` for stream + death. `cancel()` → `session.cancelRpc`. Inner `VRpcSessionApi` is the narrow view of `SessionImpl` + it depends on. + +--- + +## channels/ — physical gRPC channels & their bidi streams + +A `SessionStream` = one `ClientCall` (one bidi call). + +- **SessionStream.java** — interface: `start(Listener, md)`, `sendMessage`, `halfClose`, + `forceClose`. Listener: `onBeforeSessionStart(PeerInfo)` (fires on headers, gives AFE identity), + `onMessage`, `onClose`. +- **SessionStreamImpl.java** — wraps `ClientCall`. On `onHeaders` decodes base64 + `bigtable-peer-info` → `PeerInfo`, fires `onBeforeSessionStart`. Requests `MAX_VALUE` messages. + Checks ALTS context to detect DirectPath. +- **ForwardingSessionStream.java** — delegation skeleton for decorators. +- **FailingSessionStream.java** — pre-failed: `start()` immediately `onClose(status)`. Used by + `ChannelPoolDpImpl` when pool is closed. +- **ChannelPool.java** — interface: `start`, `close`, `newStream(method, callOpts)`, `updateConfig`. + Sole output to session layer = `SessionStream`. +- **SingleChannelPool.java** — one `ManagedChannel`; for emulator/single-endpoint. +- **ChannelPoolDpImpl.java** — DirectPath-aware dynamic pool. Groups channels by AFE + (`AfeChannelGroup` keyed on `applicationFrontendId`). `newStream()` picks least-loaded group below + `softMaxPerGroup`; else uses `startingGroup` channel or creates new. After + `onBeforeSessionStart`, "rehomes" channel to its AFE group. Channel slot freed on `onClose`; + recycled on server-drain / UNIMPLEMENTED. Periodic `serviceChannels()` (1 min) thins parallel + channels per group to 1 and resizes group count to `totalStreams / softMaxPerGroup * 2`. + Invariant: one physical channel per AFE. +- **SwitchingChannelPool.java** — reads `ChannelPoolConfiguration.modeCase`: `CLOUD_PATH_ONLY` → + fallback provider's pool; `DIRECT_ACCESS_ONLY` → primary's pool; + `DIRECT_ACCESS_WITH_FALLBACK` → `FallbackChannelPool`. Atomically swaps delegate on config change + (start new → close old). +- **FallbackChannelPool.java** — dual-pool with automatic switch. `currentPool` Atomic starts as + primary (DirectPath). Counts primary success on `onBeforeSessionStart`, failure on non-OK + `onClose` where session never opened. Periodic check: if `failures/(succ+fail) > errorRate`, + switch to secondary + notify `PoolFallbackListener`. Switches back if config disables fallback. +- **FallbackConfiguration.java** — AutoValue: `isEnabled`, `errorRate`, `checkInterval`, + primary/fallback pool refs. +- **DirectPathIpInterceptor.java** — logs when connected IP's DP-ness differs from expected + (observability only). +- **DirectpathEnforcer.java** — installed on primary inside `FallbackChannelPool`. On `onHeaders` + if no ALTS context → throws `UNAVAILABLE`, which becomes a counted primary failure. + +--- + +## middleware/ — decorator stack above the transport VRpc + +Stacked as decorators (not interceptor chain). Each layer is created fresh per op via +`Supplier`. Typical: `CancellableVRpc → RetryingVRpc → VRpcImpl`. + +- **VRpc.java** — interface for one logical RPC. Unifies unary + server-streaming by always being + server-streaming: `start(req, ctx, listener)` then `requestNext()`. Hosts: + - `VRpcCallContext` — deadline, idempotency, attempt number, `VRpcTracer`. + - `OperationInfo` — monotonic attempt counter + original deadline (retry-budget tracking). + - `VRpcResult` — final outcome. `State` ∈ `{UNCOMMITED, TRANSPORT_FAILURE, SERVER_RESULT, + USER_FAILURE}` + cluster info + backend latency + `RetryInfo` + `rejected` flag. +- **ForwardingVRpc.java** — pass-through base; `ForwardListener` forwards `onMessage`/`onClose`. +- **RetryingVRpc.java** — runs inside a gRPC `SynchronizationContext` (lock-free state machine + `Idle → Active → Scheduled|Done`). Holds `Supplier` `attemptFactory`; fresh inner per + attempt. `shouldRetry()`: (a) server returned `RetryInfo` with delay + deadline left, OR (b) ≤3 + attempts for `UNCOMMITED` or idempotent `TRANSPORT_FAILURE`. Schedules via + `SynchronizationContext.schedule` on injected executor, wraps gRPC `Context` + OTel context for + propagation. **NOT** gax retry. `requestNext()` throws `UnsupportedOperationException` — streaming + retry not implemented. +- **CancellableVRpc.java** — extends `ForwardingVRpc`. Adds a gRPC `Context.CancellationListener` + that calls `cancel()` on underlying. Explicitly skips `DEADLINE_EXCEEDED` (transport handles it). + +--- + +## compat/ — bridge from gax public API to the new stack + +Compatibility with the existing gax-based public API. New stack is opt-in per call, gated server-side. + +- **Shim.java** — interface: `close()`, `decorateReadRow(classic, rowAdapter, settings)`, + `decorateMutateRow(classic, settings)`. Only `ReadRow` and `MutateRow` shimmed today; others stay + on classic path. +- **ShimImpl.java** — startup: (1) validates transport is `InstantiatingGrpcChannelProvider` else + `DisabledShim`; (2) builds a DirectPath provider w/ cloud fallback, runs blocking + `checkDirectAccessAvailable()` probe (`GetClientConfiguration` RPC, 5s timeout) — falls back to + `GaxBasicChannelProvider` if probe fails or `PeerInfo.TransportType` isn't + `DIRECT_ACCESS`/`SESSION_DIRECT_ACCESS`; (3) builds separate basic provider for config manager + (always cloud); (4) fetches initial config sync (else `DisabledShim`); (5) sets sessions-required + flag if config requires; (6) constructs `Client`, `ReadRowShimInner`, `MutateRowShim`. +- **DisabledShim.java** — no-op; `decorate*` returns classic unchanged. +- **Util.java** — `createSessionMap(factory)` = `LoadingCache` (5-min access expiry, removal + listener calls `close()` on evicted). `extractDeadline(GrpcCallContext, defaultTimeout)` = + earliest of ambient gRPC deadline, explicit call-opt deadline, gax retry total timeout. +- **FutureAdapter.java** — `CompletableFuture` → `ApiFuture`. Holds a `CancellableContext` + closed when future completes; `cancel()` cancels both. +- **GaxDirectAccessChannelProvider.java** — `ChannelProvider`. Checks `canUseDirectPath()`; falls + back silently. `createWithFallback()` keeps basic provider as fallback. Sets + `trafficDirectorEnabled=true`, `directAccessRequested=true`. +- **GaxBasicChannelProvider.java** — non-DP provider. `attemptDirectPath(false)` + `CredInterceptor` + installing `CallCredentials`. Empty `getFallback()`. + +### compat/ops/ +- **UnaryShim.java** — interface: `call(req, deadline) → CompletableFuture`, `supports(req)`, + `Closeable`. Owns `SessionPool`s, GC'd via Util session map. +- **DivertingUnaryCallable.java** — traffic-splitting callable. Per-call: if `supports(req)` → + reads `sessionLoad` float from `ClientConfigurationManager.getSessionConfiguration()`. 0 → always + classic; 1 → always experimental; else random. Experimental path: creates `CancellableContext`, + calls `experimental.call`, translates errors → gax `ApiException`, returns `FutureAdapter`. +- **ReadRowShimInner.java** — stateful. Three `LoadingCache`s: table → `TableAsync`, auth-view → + `AuthorizedViewAsync`, mat-view → `MaterializedViewAsync` (all opened with write permission at + `Client`). `supports()` checks `consecutiveUnimplementedFailures < 30` unless `hasSession()`. + `call()` translates `Query` → `SessionReadRowRequest`, dispatches to the resource async. +- **ReadRowShim.java** — stateless: pairs `ReadRowShimInner` with a `RowAdapter`. Translates + proto response to user-facing row by walking families/columns/cells via `RowBuilder`. +- **MutateRowShim.java** — analog for writes. Caches `TableAsync` + `AuthorizedViewAsync` (write). + No mat-view. + +**Rollout strategy**: shim is an all-or-nothing startup gate (gRPC transport + DP probe + initial +config), then per-call float ratio from server config. Server can ramp traffic without a client +release. + +--- + +## csm/ — client-side metrics + +Bigtable bundles its own OTel exporter that's on by default. Three metric flavors: +table-based (bigtable_table mirror), client-based (bigtable_client e2e health), client-based gRPC +(from gRPC OTel plugin). See `csm/package-info.java`. + +### Top-level +- **Metrics.java** — vended interface. `newTableTracer(poolInfo, descriptor, deadline) → VRpcTracer`, + `newSessionTracer(poolInfo) → SessionTracer`, `getPoolFallbackListener()`, + `getDirectPathCompatibleTracer()`, `getDebugTagTracer()`, `createTracerFactory(clientInfo) → + ApiTracerFactory` (legacy gax bridge), `configureGrpcChannel(builder)` (attaches OTel gRPC + plugin), `start`, `close`. +- **MetricRegistry.java** — typed defs for every metric instrument: operation/attempt latency, + retry count, first-response, server, channel-pool outstanding, connectivity errors, session + uptime/duration/open latency, transport latency, pacemaker delay, channel fallback count, DP-compat + gauge, debug-tag count, batch flow-control, remaining deadline. Also registers gRPC-internal + metric pass-throughs (RLS, xDS, subchannel). Two `RecorderRegistry` factories: internal vs user. +- **MetricsImpl.java** — concrete. Holds both `RecorderRegistry`s, `GrpcOpenTelemetry`, + `ChannelPoolMetricsTracer`, `DirectPathCompatibleTracer`, `DebugTagTracer`, `Pacemaker`, + `PoolFallbackListener`. `newTableTracer` composes up to 3 tracers into `CompositeVRpcTracer`: + internal `VRpcTracerImpl`, optional user `VRpcTracerImpl`, optional `UserApiVRpcTracer`. + `start()` kicks off channel-pool polling, pacemaker, 1-min session-uptime loop. +- **NoopMetrics.java** — disabled mode (tests / no OTel). + +### tracers/ (the session-transport seam) +- **VRpcTracer.java** — interface: `onOperationStart` / `onAttemptStart(req)` / + `onRequestSent(PeerInfo)` / `onResponseReceived` / `recordApplicationBlockingLatencies(Duration)` / + `onAttemptFinish(result)` / `onOperationFinish(result)`. +- **VRpcTracerImpl.java** — records timers for attempt, operation, client-blocking (start → + request sent), first-response, application-blocking. `onAttemptFinish` records attempt latency + (with + without `PeerInfo`), connectivity-error count, transport latency (attempt − backend), + remaining deadline. `onOperationFinish` records operation latency, app blocking, first response, + retry count. +- **SessionTracer.java** — interface: `onStart` / `onOpen(PeerInfo)` / `onVRpcClose(Code)` / + `onClose(PeerInfo, reason, status)` / `recordAsyncMetrics()` (returns `false` when done). +- **SessionTracerImpl.java** — tracks state + `hasOkRpcs`/`hasErrorRpcs` + `lastPeerInfo`. Records + session open latency (also again on close if never ready), session duration with close reason and + vRPC outcome mix. `recordAsyncMetrics` records uptime gauge. +- **CompositeVRpcTracer.java** — fan-out; swallows child exceptions. +- **UserApiVRpcTracer.java** — bridge from new VRpc lifecycle → legacy gax `ApiTracer`. Uses + `VRpcDescriptor.toLegacyProto` for request conversion. Lets user-installed `ApiTracerFactory` + observe ops on the new transport. +- **PoolFallbackListener.java / PoolFallbackListenerImpl.java** — single-method `onFallback(from, + to, reason)`; reasons: `ERROR_RATE`, `FALLBACK_DISABLE`. Fires from `FallbackChannelPool`, + records `channelFallbackCount`. + +### exporter/ (brief) +`BigtableCloudMonitoringExporter` (OTel → Cloud Monitoring `TimeSeries`), +`BigtableFilteringExporter` (predicate on metric name; routes internal metrics to internal CM +project), `BigtablePeriodicReader` (custom OTel `MetricReader` on `ScheduledExecutorService`), +`Converter` (OTel → CM proto). + +### opencensus/ (brief) +`MetricsTracerFactory` / `MetricsTracer` — legacy gax `ApiTracer` impls using OpenCensus +`StatsRecorder`. Wired via `MetricsImpl.createTracerFactory` for classic stack. + +--- + +## dp/ — DirectPath eligibility + +- **DirectAccessChecker.java** — interface: `check(Channel) → boolean` (always shuts down the + channel), `investigateFailure(Throwable)` (background diagnosis). +- **NoopDirectAccessChecker.java** — always `false`. DP disabled. +- **AlwaysEnabledDirectAccessChecker.java** — always `true`. Tests / forced-on. +- **ClassicDirectAccessChecker.java** — production. Wraps probe channel with + `MetadataExtractorInterceptor`, runs `channelPrimer.primeChannel`, reads `PeerInfo.TransportType` + from response headers — true iff `TRANSPORT_TYPE_DIRECT_ACCESS`. `PERMISSION_DENIED` falls back to + ALTS check (`sidebandData.isAlts()`). Other errors → `investigateFailure`. Records success on + `DirectPathCompatibleTracer`. Always shuts down channel in `finally`. +- **DirectAccessInvestigator.java** — off-thread diagnosis. `FailureReason` enum: `NOT_IN_GCP`, + `METADATA_UNREACHABLE`, `NO_IP_ASSIGNED`, `LOOPBACK_DOWN`, `LOOPBACK_V4_MISSING`, + `LOOPBACK_V6_MISSING`, `USER_DISABLED`, `UNKNOWN`. Today: always `UNKNOWN` (TODO). Records on + `DirectPathCompatibleTracer`. + +**Where the check fires**: DP eligibility is decided **once at `ShimImpl.create()` startup** (not +per call) using a temporary probe channel (`GetClientConfiguration`, 5s deadline). The `channels/` +layer also uses `ClassicDirectAccessChecker` at session-open time via `ChannelPool`. `ShimImpl` +inlines its own probe logic rather than calling the checker. + +--- + +## End-to-end call flow: `client.openTableAsync("t", READ_WRITE).readRow(req, deadline)` + +1. `TableAsync.createAndStart()` → `TableBase.createAndStart()` → new `SessionPoolImpl` → + `pool.start(OpenTableRequest, md)`. +2. `SessionPoolImpl.start()` pre-creates sessions per `PoolSizer.getScaleDelta()`. Each + `createSession()` → `SessionFactory.createNew()` → `channelPool.newStream(method, callOpts)`. +3. `SwitchingChannelPool` → underlying pool (`FallbackChannelPool` / `ChannelPoolDpImpl` / single) + → gRPC `ClientCall` wrapped in `SessionStreamImpl`. +4. `new SessionImpl(metrics, poolInfo, n, stream)` → `session.start(openReq, headers, listener)` → + wraps openReq in `SessionRequest` → `stream.start` + `stream.sendMessage`. + `SessionStreamImpl` does `call.start` + `call.request(MAX_VALUE)`. +5. Server headers → `SessionStreamImpl` decodes `bigtable-peer-info`, fires + `onBeforeSessionStart(peerInfo)`. In `ChannelPoolDpImpl`, this rehomes the channel into its + AFE group. Then server sends `OPEN_SESSION` in the stream → `SessionImpl.handleOpenSessionResponse` + → READY → `SessionPool.onSessionReady` → added to AFE queue in `SessionList`. +6. User → `tableAsync.readRow(req, deadline)` → `TableBase.readRow` wraps `RetryingVRpc` around + `sessionPool.newCall(READ_ROW)` supplier. `SessionPoolImpl.newCall()` → `DynamicPicker` → + `LeastInFlightPicker` → `SessionList.checkoutSession()` → returns `SessionHandle`. + `handle.getSession().newCall(READ_ROW)` → `VRpcImpl`. +7. `VRpcImpl.start(req, ctx, listener)`: encodes typed request via `VRpcDescriptor.READ_ROW.encode`, + builds `VirtualRpcRequest` → `session.startRpc(this, payload)` → + `stream.sendMessage(SessionRequest{ virtualRpc: payload })`. +8. Server replies `SessionResponse{ virtualRpcResponse }` → `SessionImpl.handleVRpcResponse` → + `VRpcImpl.handleResponse` → `VRpcDescriptor.READ_ROW.decode` → `listener.onMessage(resp)` → + `listener.onClose(OK)`. `UnaryResponseFuture` completes the `CompletableFuture`. +9. `SessionImpl` clears `currentRpc`, calls `onVRpcComplete()` in the pool → `handle.onVRpcFinish()` + re-queues session + updates `AfeHandle` latency → `tryDrainPendingRpcs()` dispatches anything + queued. + +--- + +## Key invariants & landmarks + +- **Per-AFE channel**: `ChannelPoolDpImpl` keeps one physical gRPC channel per AFE, sized to demand. +- **Session = bidi stream**: One `SessionStream` = one bidi `ClientCall`; many sequential vRPCs reuse it. +- **vRPC multiplexing**: Today one-at-a-time per session (TODO comment in `Session.java`). +- **Picker drives load balancing across AFEs**, not across sessions within an AFE (per-AFE queue is FIFO). +- **Retry is custom**, not gax — see `RetryingVRpc`. Streaming retry not implemented. +- **Rollout** is server-controlled: `ClientConfigurationManager` ships a `sessionLoad` float; the + shim splits traffic per call. Only `ReadRow` + `MutateRow` are wired in today. +- **Fallback to classic** when the new pool hits `consecutiveUnimplementedFailures >= 30` (and no + active session). +- **DirectPath fallback** between primary/secondary pools is automatic and bidirectional via + `FallbackChannelPool`'s error-rate check. diff --git a/java-bigtable/THREADING_REFACTOR_PLAN.md b/java-bigtable/THREADING_REFACTOR_PLAN.md new file mode 100644 index 000000000000..9dc31cf1b7c8 --- /dev/null +++ b/java-bigtable/THREADING_REFACTOR_PLAN.md @@ -0,0 +1,424 @@ +# Threading Refactor Implementation Plan + +Target model: `DirectExecutor` on gRPC session streams + per-session `SynchronizationContext` + +per-op `SerializingExecutor(CachedThreadPool)`. See `THREADING_MODEL.md` for rationale. + +Each step below is intended to be a single reviewable PR. Steps within a phase depend on the +previous step. Phase 5 (per-AFE sharding) is independent of all other phases and can run in +parallel. + +All affected packages live under `com.google.cloud.bigtable.data.v2.internal`, so API +signatures (e.g. `VRpcSessionApi`, `RetryingVRpc`, `TableBase`) can be changed freely without +backwards-compatibility concerns. + +## Progress + +| Step | Status | Commit | +|------|--------|--------| +| 1 | ✅ done | `67f89c5101a` | +| 2 | ✅ done | `fead999fa29` | +| 3 | ✅ done | `cea26faf924` | +| 4 | ✅ done | `defa1e075b5` | +| 5 | ✅ done | `03f3d522103` | +| 5.5 | ⬜ pending | | +| 6 | ✅ done | `a389e141938` | +| 7 | ✅ done | `d98b10f028e` | +| 8 | ✅ done | `223603653ef` | +| 9 | ✅ done | `1a5bf3bee1c` | +| 10 | ✅ done | `e14eb711f62` | +| 11 | ⬜ pending | | +| 12 | ⬜ pending | | +| 13 | ⬜ pending | | + +--- + +## Phase 0: Pre-cleanup + +### Step 1 — Rationalize `SessionImpl` field visibility + +`heartbeatInterval` is written inside `synchronized(lock)` in `handleSessionParamsResponse` but +read outside the lock in `startRpc()` and `handleHeartBeatResponse()`. `nextHeartbeat` is +written from multiple places outside the lock. Both are `volatile`, which papers over the +race but creates an inconsistent ownership model that will need to be resolved before the lock +is removed in Phase 2. + +**Changes:** +- Move the `heartbeatInterval` read in `startRpc()` inside `synchronized(lock)` (capture to a + local before the block if needed for `nextHeartbeat` assignment). +- Add `@GuardedBy("lock")` to every field that is only accessed under the lock. +- Add explicit comments on fields intentionally read outside the lock (`nextHeartbeat`, + `openParams`, `closeReason`) documenting why a stale read is acceptable there. +- Audit the `@SuppressWarnings("GuardedBy")` annotations on both `SessionPoolImpl` + constructors and the untyped `Object lock` parameter on `Watchdog`. Document what each is + actually guarding (or what contract it admits violating) in a code comment, so Phase 5 has + a clear answer when it redesigns the locking model. The suppressions stay in place until + Phase 5; this step only captures ground truth. + +**Nothing changes at runtime. Tests pass unchanged.** + +--- + +## Phase 1: Plumb the op executor through the stack + +No behavior changes in this phase. The goal is to establish `ctx.getExecutor()` as the +single serialization point for op-level state, while keeping functional behavior identical. + +### Step 2 — Add `Executor` to `VRpcCallContext`; `VRpcImpl` stores `ctx` + +**Changes:** +- Add `abstract Executor getExecutor()` to `VRpcCallContext`. Default factory + (`VRpcCallContext.create(...)`) passes `MoreExecutors.directExecutor()`. +- `VRpcImpl.start()` stores the `VRpcCallContext` in a field (`this.ctx = ctx`). + +`directExecutor()` runs tasks inline on the calling thread, so every call to +`ctx.getExecutor().execute(task)` is identical to calling `task.run()` directly. No +threading change. + +**Nothing changes at runtime. Tests pass unchanged.** + +--- + +### Step 3 — Extract `SynchronizationContext` from `RetryingVRpc` to `TableBase` + +Currently `RetryingVRpc` creates its own `SynchronizationContext` in its constructor. This +step moves that creation one level up so the same context is accessible via `ctx.getExecutor()`. + +**Changes:** +- `TableBase.readRow()` and `mutateRow()` create a `SynchronizationContext` per call and pass + it to both the `RetryingVRpc` constructor and `VRpcCallContext.create(...)`. +- `RetryingVRpc` constructor accepts a `SynchronizationContext` instead of creating one. +- `ctx.getExecutor()` now returns the same `SynchronizationContext` that `RetryingVRpc` holds. + +`CancellableVRpc` (in `middleware/`) sits in the chain between `TableBase` and `RetryingVRpc`. +It does **not** own a `SynchronizationContext` of its own — it propagates `ctx` (which carries +the executor) to the wrapped `RetryingVRpc`. No state changes needed in `CancellableVRpc`; it +simply forwards the `ctx` it was already forwarding. + +`RetryingVRpc.cancel()` continues to use its constructor-provided `SynchronizationContext` +(same object as `ctx.getExecutor()`), so pre-start cancel is still safe. + +**Behavior is identical — same SynchronizationContext, just created one level higher.** + +--- + +### Step 4 — Move callback dispatch from `RetryingVRpc` listener wrapping to `VRpcImpl` + +Currently `RetryingVRpc`'s anonymous `VRpcListener` wraps every callback in +`syncContext.execute(...)`. This step moves that dispatch into `VRpcImpl` so the session layer +can later dispatch to the op executor directly without going through the listener chain. + +**Changes:** +- `VRpcImpl.handleResponse()`, `handleError()`, `handleSessionClose()` each wrap their + `listener.call(...)` invocation in `ctx.getExecutor().execute(...)`. +- Remove the `syncContext.execute(...)` wrapping from `RetryingVRpc.Active`'s `onMessage` and + `onClose` listener bodies. They now run directly inside the already-dispatched executor task. +- `RetryingVRpc.cancel()` is unchanged — it still dispatches to its `SynchronizationContext` + field (same object as `ctx.getExecutor()`). +- `RetryingVRpc.Scheduled.onStart` retry-delay scheduling becomes + `scheduledExecutor.schedule(() -> ctx.getExecutor().execute(task), delay)` (replacing the + current `syncContext.schedule(..., executor)` form). + +The serialization guarantee is unchanged: responses and cancels both go through the same +`SynchronizationContext` (responses now via `VRpcImpl`, cancels via `RetryingVRpc.cancel()`). + +**Behavior is identical. Tests pass unchanged.** + +--- + +## Phase 2: Per-session `SynchronizationContext` + +### Step 5 — Add session `SynchronizationContext` alongside the existing lock + +This step introduces the session `SynchronizationContext` without removing any existing +synchronization. The lock and the SyncContext coexist. + +**Changes:** +- `SessionImpl` gets a `SynchronizationContext sessionSyncContext` field, created in the + constructor. +- `SessionStream.Listener.onMessage()` and `onClose()` submit to `sessionSyncContext` instead + of calling `dispatchResponseMessage()` directly. +- Each handler method (`handleVRpcResponse`, `handleGoAwayResponse`, `handleHeartBeatResponse`, + etc.) runs inside a `sessionSyncContext` task. Existing `synchronized(lock)` blocks remain + inside those tasks. +- Heartbeat scheduling is rewritten so the scheduled callback dispatches onto + `sessionSyncContext`: + ```java + scheduledExecutor.schedule( + () -> sessionSyncContext.execute(this::sendHeartbeat), delay, MILLISECONDS); + ``` + Reads of `heartbeatInterval` and `nextHeartbeat` inside the heartbeat callback now happen on + the session SyncContext. This is the prerequisite for removing their `volatile` declarations + in Step 7. +- External callers (`startRpc`, `cancelRpc`, `forceClose`, `close`) are unchanged for now. + +All session callbacks (including heartbeat ticks) are now serialized by `sessionSyncContext` +as well as by the existing lock. The lock is now redundant for the response handlers but +harmless. + +**Behavior is identical. Tests pass unchanged.** + +--- + +### Step 5.5 — Switch heartbeat and deadline monitoring to `HashedWheelTimer` + +Step 5 introduced per-session heartbeat scheduling at ~100 ms intervals. At this frequency, +heartbeat tasks generate 10N `schedule()` calls/sec for N sessions — each an O(log n) insert +into the `ScheduledThreadPoolExecutor`'s `DelayQueue`. `PendingVRpc.monitorDeadline()` compounds +this: deadline futures are created and cancelled within ~1 ms (vRPC p50 session-wait), but +`cancel(false)` leaves them in the heap until natural expiry, inflating n and raising the cost +of every heartbeat insert. See ISSUE-007. + +**Changes:** +- Add a `HashedWheelTimer` to `SessionPoolImpl` — one per pool, created in the constructor, + stopped in `close()`. Tick duration: 10 ms, 512 buckets (one rotation ≈ 5 s). The tick thread + is pool-internal and must not block; all real work is submitted to `sessionSyncContext` or + `ctx.getExecutor()`. +- Pass the `HashedWheelTimer` to `SessionImpl` (constructor parameter). Replace the + `scheduledExecutor.schedule(...)` call in `scheduleHeartbeatCheck()` with + `timer.newTimeout(...)`. The callback body is unchanged: submit `this::checkHeartbeat` to + `sessionSyncContext`. +- In `PendingVRpc`, replace `executorService.schedule(...)` in `monitorDeadline()` with + `timer.newTimeout(...)`. `Timeout.cancel()` is O(1) and does not leave zombie entries. +- The `ScheduledExecutorService` (`executorService`) is retained for retry-delay scheduling in + `RetryingVRpc` — it is not replaced, only relieved of heartbeat and deadline-monitoring work. +- Close the `HashedWheelTimer` in `SessionPoolImpl.close()` after cancelling pending RPCs and + closing sessions, so in-flight heartbeat callbacks can still submit to `sessionSyncContext` + during the drain. + +**Result:** heartbeat and deadline-monitor scheduling become O(1) insert and O(1) cancel with +no zombie accumulation. The `ScheduledExecutorService` queue depth drops to the number of +pending retry delays only. + +**Small change. No behavioral change.** + +--- + +### Step 6 — Make `startRpc()` and `cancelRpc()` async + +`startRpc()` currently returns a `Status` synchronously. Under the new model it must submit to +`sessionSyncContext` and return immediately, delivering errors via the op executor. + +**Call graph (current):** +- `RetryingVRpc.Active.onStart()` constructs the attempt (a `VRpcImpl`) and calls + `attempt.start(req, ctx, listener)`. +- `VRpcImpl.start()` calls `sessionApi.startRpc(this, req)` and receives a `Status`. +- If the `Status` is non-OK, `VRpcImpl.start()` synchronously calls + `listener.onClose(status, ...)`. + +`VRpcImpl.start()` is the only caller of `VRpcSessionApi.startRpc()`. + +**Changes:** +- `VRpcSessionApi.startRpc(VRpcImpl, VirtualRpcRequest)` becomes `void`. The implementation + inside `SessionImpl` runs as a `sessionSyncContext` task; on validation failure it calls + `ctx.getExecutor().execute(() -> listener.onClose(error))` rather than returning a `Status`. +- `VRpcImpl.start()` no longer has a synchronous error branch. It calls + `sessionApi.startRpc(this, req)` and returns. All errors flow through the async + `listener.onClose` path that was established in Step 4. +- `RetryingVRpc.Active.onStart()` is structurally unchanged — it already relies on + `listener.onClose` for failure delivery. The only difference is that the failure now always + arrives via the op executor instead of being delivered synchronously inline. +- `cancelRpc()` submits to `sessionSyncContext` (it was already void; only the execution site + changes). +- `VRpcSessionApi` interface updated to reflect the new signatures. Because `VRpcSessionApi` + lives in `internal`, the signature change has no compatibility surface. + +The existing `synchronized(lock)` blocks inside the session SyncContext tasks remain. + +**Medium noise (VRpcImpl.start() restructured). Behavior unchanged from caller perspective.** + +--- + +### Step 7 — Remove `synchronized(lock)` from `SessionImpl` + +All session state is now protected by `sessionSyncContext` (established in steps 5 and 6, +including the heartbeat path). The lock is redundant. + +**Changes:** +- Remove all `synchronized(lock)` blocks from `SessionImpl`. State mutations happen directly + inside `sessionSyncContext` tasks. +- Remove the `lock` field. +- Remove `volatile` declarations from fields previously requiring them to bridge lock/non-lock + reads (`heartbeatInterval`, `nextHeartbeat`, `openParams`, `openParamsUpdated`). All accesses + are now on the session SyncContext and the happens-before relationship is provided by it. +- The `closeReason` read site in `dispatchStreamClosed` is now inside a `sessionSyncContext` + task, so its read is safely ordered. +- Remove `@GuardedBy` annotations added in Step 1. + +**Medium noise (mechanical removal). Behavior unchanged.** + +--- + +## Phase 3: Switch executor implementations + +### Step 8 — Switch op executor to `SerializingExecutor(userCallbackExecutor)` + +Replace the per-op `SynchronizationContext` with a `SerializingExecutor` backed by a +user-supplied callback executor. This ensures user callbacks always run on a pool thread and +never inline on the gRPC callback thread. + +**Changes:** +- `TableBase` accepts a `userCallbackExecutor` (`Executor`) as a constructor parameter. The + default — wired via the gax `TransportChannelProvider` / `ClientSettings` — is a shared + `CachedThreadPool`. Users may supply any other `Executor` (e.g. a bounded + `ThreadPoolExecutor`) through the same plumbing. +- `TableBase.readRow()` and `mutateRow()` create `new SerializingExecutor(userCallbackExecutor)` + per call and pass it as `ctx.getExecutor()`. +- `RetryingVRpc` constructor now receives a plain `Executor` (the `SerializingExecutor`) instead + of a `SynchronizationContext`. The internal `SynchronizationContext` field is removed. + `cancel()` submits to this executor. +- Retry delay scheduling: `scheduledExecutor.schedule(() -> ctx.getExecutor().execute(task), ...)` + (already in the form established by Step 4). + +Notes: +- The default `CachedThreadPool` is unbounded; the per-op `SerializingExecutor` bounds the + parallelism per op to 1, but total thread count scales with the number of concurrent ops. + Users that need a bound supply their own executor. +- `SerializingExecutor`'s queue is unbounded; if user callbacks block, work accumulates in + memory. This is the user's responsibility to manage via their executor choice and callback + hygiene. + +After this step user callbacks run on the configured callback executor. The gRPC callback +thread (or Netty, after the next step) is freed as soon as it submits to the +`SerializingExecutor`. + +**User-visible change: callbacks run on a different thread.** + +--- + +### Step 9 — Configure gRPC session streams with `DirectExecutor` + +**Changes:** +- Pass `CallOptions.withExecutor(MoreExecutors.directExecutor())` when creating the + `ClientCall` for the session stream in `SingleChannelPool` / `ChannelPoolDpImpl` (wherever + `newCall()` is invoked on the channel for session streams). +- The gRPC user thread pool is no longer used for session stream callbacks. Netty threads + deliver `SessionStream.Listener` callbacks directly. +- Netty trampolines through the session `SynchronizationContext` (fast, bounded bookkeeping), + then submits to the `SerializingExecutor` and returns. +- Add a class-level comment on `SessionStream.Listener` (and a comment on + `SessionImpl.sessionSyncContext`) documenting the invariant: + + > **Invariant:** callbacks delivered here run on Netty I/O threads via + > `DirectExecutor`. All work must be fast and non-blocking — any user-facing or potentially + > blocking work must be dispatched onto the op executor (`ctx.getExecutor()`) before + > returning. Violating this stalls the channel. + +(Whether to additionally support a configuration that keeps the gRPC executor + per-session +`SynchronizationContext` is deferred pending benchmarks. Both modes could potentially ship +side-by-side selectable via `ClientConfig`.) + +**Small change (one `CallOptions` line + invariant comments). Performance improvement, no +behavioral change.** + +--- + +## Phase 4: `PendingVRpc` cleanup + +### Step 10 — Route `PendingVRpc` per-op state through the op executor + +Currently `PendingVRpc.drainTo()` and `cancel()` protect `isCancelled` and `realCall` using +the pool lock. These are per-op concerns and should be on the op executor. + +**Changes:** +- `drainTo(SessionHandle handle)`: + ```java + synchronized (SessionPoolImpl.this) { + pendingRpcs.remove(this); // pool lock: queue mutation only + } + ctx.getExecutor().execute(() -> { + if (isCancelled) return; + realCall = newRealCall(desc, handle); + realCall.start(req, ctx, listener); + }); + ``` +- `cancel(Status status, boolean onlyCancelPendingCall)`: submits the isCancelled check and + `realCall` assignment/delegation to `ctx.getExecutor()`. +- Pool lock now covers only pool-level state: `pendingRpcs` queue, `poolState`, session list. +- The `NOOP_CALL` sentinel (used to paper over the cancel-before-start race) is no longer + needed and can be removed. + +Because `ctx.getExecutor()` is a `SerializingExecutor`, `drainTo` and `cancel` are strictly +ordered. The cancel-before-start race is eliminated structurally. + +**Medium noise.** + +--- + +## Phase 5: Per-AFE lock sharding (independent track) + +This phase can proceed in parallel with or after Phases 0–4. It addresses pool-checkout +contention under high-concurrency multiplexing. The `AfeHandle` in `SessionList` groups sessions +by Application Frontend, but all operations currently serialize on one pool-wide lock. + +### Step 11 — Expose `AfeId` from `Picker` + +Currently `Picker.pickSession()` returns a `SessionHandle`. To shard the lock per-AFE the +caller needs to know which AFE was selected. + +**Changes:** +- `Picker.PickResult` (or equivalent) carries `(SessionHandle, AfeId)`. +- `SessionPoolImpl.tryDrainPendingRpcs()` records the selected `AfeId` for use in the + subsequent lock acquisition. +- No lock sharding yet — `AfeId` is threaded through but the single lock remains. + +**Small. Behavior unchanged.** + +--- + +### Step 12 — Give `AfeHandle` its own lock + +**Changes:** +- `AfeHandle` gets a `final Object lock = new Object()` field. +- Per-AFE mutable state (`sessions` queue, `refCount`, `lastConnected`, `PeakEwma`) is + accessed under `AfeHandle.lock` instead of the pool-wide lock. +- `SessionList` operations that touch a single AFE acquire `AfeHandle.lock` directly. +- Operations that touch multiple AFEs or pool-wide state (session creation, pool lifecycle) + still acquire the pool-wide lock. + +**Lock-ordering rule (enforced from this step onward):** + +> Pool-wide lock (`SessionPoolImpl.this`) is the **outer** lock. `AfeHandle.lock` is the +> **inner** lock. Any code path that needs both MUST acquire the pool lock first, then the +> AFE lock. Never the reverse. Paths that need only one acquire it directly. + +Document the rule with a class-level comment on `SessionPoolImpl` and a field comment on +`AfeHandle.lock`. + +**Resolving pre-existing locking debt (carried over from Step 1's audit):** +- Remove the `@SuppressWarnings("GuardedBy")` from both `SessionPoolImpl` constructors. With + the lock model now explicit (outer pool, inner AFE, defined ordering), either replace each + suppression with a correct `@GuardedBy` annotation or remove it if the constructor path is + genuinely lock-free. +- Type `Watchdog`'s lock parameter properly. Decide whether it needs the pool-wide lock, an + `AfeHandle.lock`, or both (and in what order), then change the parameter type from `Object` + to the actual lock type(s) and add correct `@GuardedBy` annotations. Remove the + `// TODO: fix lock sharing` comment. + +**Medium noise. Behavior unchanged.** + +--- + +### Step 13 — Shard `SessionPoolImpl` checkout/return per AFE + +**Changes:** +- `tryDrainPendingRpcs()`: after picking a session (which identifies an AFE), acquires + `afeHandle.lock` for the checkout, not the pool-wide lock. +- `onVRpcComplete()`: returns the session to `afeHandle.sessions` under `afeHandle.lock`. +- The pool-wide lock scope narrows to: pool lifecycle (`poolState`), session creation/removal, + and operations that must see the full session list. +- All paths that acquire both locks follow the ordering rule established in Step 12. + +**Medium-large. Primary performance win for high-concurrency multiplexing.** + +--- + +## Summary + +| Phase | Steps | Key outcome | +|---|---|---| +| 0 | 1 | Lock ownership documented, volatile races fixed | +| 1 | 2–4 | `ctx.getExecutor()` plumbed, dispatch in `VRpcImpl`, `CancellableVRpc` forwards `ctx` | +| 2 | 5–7 | Session state (including heartbeat) owned by `SynchronizationContext`, lock removed | +| 3 | 8–9 | User callbacks on `SerializingExecutor(userCallbackExecutor)`, Netty as callback thread with documented invariant | +| 4 | 10 | Pool lock owns only pool topology; cancel-before-start race eliminated | +| 5 | 11–13 | Per-AFE lock sharding for multiplexing throughput, with explicit lock-ordering rule | diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/api/TableBase.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/api/TableBase.java index 8feef399d625..180b7577081f 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/api/TableBase.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/api/TableBase.java @@ -34,16 +34,19 @@ import com.google.cloud.bigtable.data.v2.internal.session.VRpcDescriptor; import com.google.cloud.bigtable.data.v2.internal.util.ClientConfigurationManager; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Message; import io.grpc.CallOptions; import io.grpc.Context; import io.grpc.Deadline; import io.grpc.Metadata; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; class TableBase implements AutoCloseable { private final SessionPool sessionPool; private final ScheduledExecutorService backgroundExecutor; + private final Executor userCallbackExecutor; private final Metrics metrics; private final VRpcDescriptor readRowDescriptor; private final VRpcDescriptor @@ -77,7 +80,7 @@ static TableBase createAndStart( sessionPool.start(openReq, new Metadata()); - return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor); + return new TableBase(sessionPool, readRowDescriptor, mutateRowDescriptor, metrics, executor, executor); } @VisibleForTesting @@ -86,12 +89,14 @@ static TableBase createAndStart( VRpcDescriptor readRowDescriptor, VRpcDescriptor mutateRowDescriptor, Metrics metrics, - ScheduledExecutorService executor) { + ScheduledExecutorService backgroundExecutor, + Executor userCallbackExecutor) { this.sessionPool = sessionPool; this.readRowDescriptor = readRowDescriptor; this.mutateRowDescriptor = mutateRowDescriptor; this.metrics = metrics; - this.backgroundExecutor = executor; + this.backgroundExecutor = backgroundExecutor; + this.userCallbackExecutor = userCallbackExecutor; } @Override @@ -109,11 +114,12 @@ public SessionPool getSessionPool() { public void readRow( SessionReadRowRequest req, VRpcListener listener, Deadline deadline) { + Executor opExecutor = MoreExecutors.newSequentialExecutor(userCallbackExecutor); RetryingVRpc retry = - new RetryingVRpc<>(() -> sessionPool.newCall(readRowDescriptor), backgroundExecutor); + new RetryingVRpc<>(() -> sessionPool.newCall(readRowDescriptor), backgroundExecutor, opExecutor); VRpcTracer tracer = metrics.newTableTracer(sessionPool.getInfo(), readRowDescriptor, deadline); - VRpcCallContext ctx = VRpcCallContext.create(deadline, true, tracer); + VRpcCallContext ctx = VRpcCallContext.create(deadline, true, tracer, opExecutor); CancellableVRpc cancellableVRpc = new CancellableVRpc<>(retry, Context.current()); @@ -125,14 +131,16 @@ public void mutateRow( SessionMutateRowRequest req, VRpcListener listener, Deadline deadline) { + Executor opExecutor = MoreExecutors.newSequentialExecutor(userCallbackExecutor); RetryingVRpc retry = - new RetryingVRpc<>(() -> sessionPool.newCall(mutateRowDescriptor), backgroundExecutor); + new RetryingVRpc<>( + () -> sessionPool.newCall(mutateRowDescriptor), backgroundExecutor, opExecutor); boolean idempotent = Util.isIdempotent(req.getMutationsList()); VRpcTracer tracer = metrics.newTableTracer(sessionPool.getInfo(), mutateRowDescriptor, deadline); - VRpcCallContext ctx = VRpcCallContext.create(deadline, idempotent, tracer); + VRpcCallContext ctx = VRpcCallContext.create(deadline, idempotent, tracer, opExecutor); CancellableVRpc cancellable = new CancellableVRpc<>(retry, Context.current()); diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java index 9ef2a486f195..e3d6433ad0ed 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/ChannelPoolDpImpl.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; +import com.google.common.util.concurrent.MoreExecutors; import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.ManagedChannel; @@ -206,8 +207,11 @@ public synchronized SessionStream newStream( channelWrapper.group.numStreams++; totalStreams++; + // DirectExecutor: gRPC/Netty delivers SessionStream.Listener callbacks directly on the + // I/O thread. All work must be fast and non-blocking; blocking work goes to sessionSyncContext. ClientCall innerCall = - channelWrapper.channel.newCall(desc, callOptions); + channelWrapper.channel.newCall( + desc, callOptions.withExecutor(MoreExecutors.directExecutor())); return new SessionStreamImpl(innerCall) { // mark as null so that onClose can tell if onBeforeSessionStart was never called diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/SessionStream.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/SessionStream.java index ebbc39af7f2c..eb6dbfa339eb 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/SessionStream.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/SessionStream.java @@ -37,6 +37,14 @@ public interface SessionStream { public void forceClose(@Nullable String message, @Nullable Throwable cause); + /** + * Callbacks for session stream events. + * + *

Invariant: callbacks are delivered on Netty I/O threads via {@code DirectExecutor}. + * All work must be fast and non-blocking — any user-facing or potentially blocking work must be + * dispatched onto the session {@code SynchronizationContext} (which then forwards to the op + * executor) before returning. Violating this stalls the channel. + */ public interface Listener { void onBeforeSessionStart(PeerInfo peerInfo); diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/SingleChannelPool.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/SingleChannelPool.java index 6d40b58d53d4..bfd099f60d51 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/SingleChannelPool.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/channels/SingleChannelPool.java @@ -19,6 +19,7 @@ import com.google.bigtable.v2.SessionClientConfiguration.ChannelPoolConfiguration; import com.google.bigtable.v2.SessionRequest; import com.google.bigtable.v2.SessionResponse; +import com.google.common.util.concurrent.MoreExecutors; import io.grpc.CallOptions; import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; @@ -45,7 +46,10 @@ public void close() { @Override public SessionStream newStream( MethodDescriptor desc, CallOptions callOptions) { - return new SessionStreamImpl(channel.newCall(desc, callOptions)); + // DirectExecutor: gRPC/Netty delivers SessionStream.Listener callbacks directly on the + // I/O thread. All work must be fast and non-blocking; blocking work goes to sessionSyncContext. + return new SessionStreamImpl( + channel.newCall(desc, callOptions.withExecutor(MoreExecutors.directExecutor()))); } @Override diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpc.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpc.java index d6048bfb9140..66f1e6440888 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpc.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpc.java @@ -23,10 +23,11 @@ import com.google.rpc.RetryInfo; import io.grpc.Context; import io.grpc.Status; -import io.grpc.SynchronizationContext; import java.util.Optional; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.logging.Level; @@ -46,8 +47,8 @@ public class RetryingVRpc implements VRpc { private VRpcCallContext context; private VRpcTracer tracer; - private final ScheduledExecutorService executor; - private final SynchronizationContext syncContext; + private final ScheduledExecutorService scheduledExecutor; + private final Executor opExecutor; // current state and all the flags don't need to be volatile because they're only updated within // the sync context. @@ -56,21 +57,17 @@ public class RetryingVRpc implements VRpc { // Breaks the loop if uncaught exception happens during sync context execution. private boolean isCancelling; - public RetryingVRpc(Supplier> supplier, ScheduledExecutorService executor) { + public RetryingVRpc( + Supplier> supplier, + ScheduledExecutorService scheduledExecutor, + Executor opExecutor) { this.attemptFactory = supplier; grpcContext = Context.current(); otelContext = io.opentelemetry.context.Context.current(); - this.executor = otelContext.wrap(executor); - this.syncContext = - new SynchronizationContext( - (t, e) -> { - this.cancel( - "Unexpected error while notifying the caller of RetryingVRpc. Trying to cancel" - + " vRpc to ensure consistent state", - e); - }); + this.scheduledExecutor = otelContext.wrap(scheduledExecutor); + this.opExecutor = opExecutor; started = false; isCancelling = false; @@ -79,29 +76,33 @@ public RetryingVRpc(Supplier> supplier, ScheduledExecutorServi @Override public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { - syncContext.execute( + opExecutor.execute( () -> { - if (started) { - listener.onClose( - VRpcResult.createRejectedError( - Status.FAILED_PRECONDITION.withDescription("operation is already started"))); - return; - } - started = true; + try { + if (started) { + listener.onClose( + VRpcResult.createRejectedError( + Status.FAILED_PRECONDITION.withDescription("operation is already started"))); + return; + } + started = true; - this.request = req; - this.listener = listener; - this.context = ctx; - this.tracer = context.getTracer(); + this.request = req; + this.listener = listener; + this.context = ctx; + this.tracer = context.getTracer(); - tracer.onOperationStart(); - currentState.onStart(); + tracer.onOperationStart(); + currentState.onStart(); + } catch (Throwable t) { + cancel("Unexpected error in op executor", t); + } }); } @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { - syncContext.execute( + opExecutor.execute( () -> { if (currentState.isDone() || isCancelling) { LOG.fine("Ignoring cancel because the vRPC is already cancelled or done."); @@ -132,7 +133,6 @@ public void requestNext() { } void onStateChange(State state) { - syncContext.throwIfNotInThisSynchronizationContext(); if (currentState.isDone()) { return; } @@ -177,55 +177,55 @@ public void onStart() { new VRpcListener() { @Override public void onMessage(RespT msg) { - syncContext.execute( - () -> { - if (currentState != Active.this) { - LOG.log( - Level.FINE, - "Discarding response {0} because the attempt is no longer active.", - msg); - return; - } - tracer.onResponseReceived(); - Stopwatch appTimer = Stopwatch.createStarted(); - try { - listener.onMessage(msg); - } finally { - tracer.recordApplicationBlockingLatencies(appTimer.elapsed()); - } - }); + // VRpcImpl dispatches this callback onto ctx.getExecutor() (the same + // SynchronizationContext syncContext uses), so we are already inside the + // syncContext here — no additional wrapping needed. + if (currentState != Active.this) { + LOG.log( + Level.FINE, + "Discarding response {0} because the attempt is no longer active.", + msg); + return; + } + tracer.onResponseReceived(); + Stopwatch appTimer = Stopwatch.createStarted(); + try { + listener.onMessage(msg); + } finally { + tracer.recordApplicationBlockingLatencies(appTimer.elapsed()); + } } @Override public void onClose(VRpcResult result) { - syncContext.execute( - () -> { - tracer.onAttemptFinish(result); - if (currentState != Active.this) { - LOG.log( - Level.FINE, - "Discarding server close with result {0} because the the attempt is no" - + " longer active.", - result); - return; - } - if (shouldRetry(result)) { - context = context.createForNextAttempt(); - Duration retryDelay = - Optional.ofNullable(result.getRetryInfo()) - .map(RetryInfo::getRetryDelay) - .orElse(Durations.ZERO); - if (Durations.compare(retryDelay, Durations.ZERO) > 0) { - Scheduled scheduled = new Scheduled(retryDelay); - onStateChange(scheduled); - } else { - onStateChange(new Idle()); - } - return; - } - - onStateChange(new Done(result)); - }); + // VRpcImpl dispatches this callback onto ctx.getExecutor() (the same + // SynchronizationContext syncContext uses), so we are already inside the + // syncContext here — no additional wrapping needed. + tracer.onAttemptFinish(result); + if (currentState != Active.this) { + LOG.log( + Level.FINE, + "Discarding server close with result {0} because the the attempt is no" + + " longer active.", + result); + return; + } + if (shouldRetry(result)) { + context = context.createForNextAttempt(); + Duration retryDelay = + Optional.ofNullable(result.getRetryInfo()) + .map(RetryInfo::getRetryDelay) + .orElse(Durations.ZERO); + if (Durations.compare(retryDelay, Durations.ZERO) > 0) { + Scheduled scheduled = new Scheduled(retryDelay); + onStateChange(scheduled); + } else { + onStateChange(new Idle()); + } + return; + } + + onStateChange(new Done(result)); } }); } @@ -271,7 +271,7 @@ boolean shouldRetry(VRpcResult result) { class Scheduled extends State { private final Duration retryDelay; - private SynchronizationContext.ScheduledHandle future; + private ScheduledFuture future; Scheduled(Duration retryDelay) { this.retryDelay = retryDelay; @@ -281,11 +281,11 @@ class Scheduled extends State { public void onStart() { try { future = - syncContext.schedule( - () -> grpcContext.wrap(() -> onStateChange(new Idle())).run(), + scheduledExecutor.schedule( + grpcContext.wrap( + () -> context.getExecutor().execute(() -> onStateChange(new Idle()))), Durations.toMillis(retryDelay), - TimeUnit.MILLISECONDS, - executor); + TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { onStateChange( new Done( @@ -301,10 +301,9 @@ public void onStart() { public void onCancel(String reason, Throwable throwable) { // future can be null if schedule throws an exception that's not RejectedExecutionException. // In which case sync context uncaught exception handler will be called, which calls cancel on - // the current - // state before transition into done state. - if (future != null && future.isPending()) { - future.cancel(); + // the current state before transition into done state. + if (future != null && !future.isDone()) { + future.cancel(false); } } } diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/VRpc.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/VRpc.java index a29a51fd72c6..0a52a04505dc 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/VRpc.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/middleware/VRpc.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList; import com.google.protobuf.Any; import com.google.rpc.RetryInfo; +import com.google.common.util.concurrent.MoreExecutors; import io.grpc.Context; import io.grpc.Deadline; import io.grpc.Metadata; @@ -33,6 +34,7 @@ import io.grpc.protobuf.StatusProto; import java.time.Duration; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -92,12 +94,25 @@ abstract class VRpcCallContext { public abstract VRpcTracer getTracer(); + /** + * Executor for op-level callback serialization. Defaults to {@link + * MoreExecutors#directExecutor()} — tasks run inline on the calling thread. Phase 3 will + * replace this with a {@code SerializingExecutor(userCallbackExecutor)} so user callbacks + * always run on a pool thread. + */ + public abstract Executor getExecutor(); + // TODO: csm // Clientside metrics instrument // public abstract BigtableTracer getTracer(); public static VRpcCallContext create( Deadline deadline, boolean isIdempotent, VRpcTracer tracer) { + return create(deadline, isIdempotent, tracer, MoreExecutors.directExecutor()); + } + + public static VRpcCallContext create( + Deadline deadline, boolean isIdempotent, VRpcTracer tracer, Executor executor) { Deadline grpcContextDeadline = Context.current().getDeadline(); @@ -114,12 +129,15 @@ public static VRpcCallContext create( } return new AutoValue_VRpc_VRpcCallContext( - OperationInfo.create(operationTimeout, isIdempotent), "TODO", tracer); + OperationInfo.create(operationTimeout, isIdempotent), + "TODO", + tracer, + executor); } public VRpcCallContext createForNextAttempt() { return new AutoValue_VRpc_VRpcCallContext( - getOperationInfo().createForNextAttempt(), getTraceParent(), getTracer()); + getOperationInfo().createForNextAttempt(), getTraceParent(), getTracer(), getExecutor()); } } diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java index 9ee86bffbdbd..e367fec3168f 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImpl.java @@ -44,15 +44,18 @@ import com.google.protobuf.util.Durations; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.SynchronizationContext; import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Locale; import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; /** Wraps a Bidi ClientCall and layers session semantics on top. */ @VisibleForTesting @@ -70,57 +73,62 @@ public class SessionImpl implements Session, VRpcSessionApi { // A time in the future to skip heartbeat checks when there's no active vRPCs on the session static final Duration FUTURE_TIME = Duration.ofMinutes(30); - /* - * This lock should be mostly uncontended - all access should be naturally interleaved. Contention - * can only really happen when an unsolicited gRPC control message (ie GOAWAY) arrives at the same - * time as newCall or cancel. - * TODO: Contention will increase when multiplexing is implemented. - */ - private final Object lock = new Object(); + private static final CloseSessionRequest MISSED_HEARTBEAT_CLOSE_REQUEST = + CloseSessionRequest.newBuilder() + .setReason(CloseSessionReason.CLOSE_SESSION_REASON_MISSED_HEARTBEAT) + .setDescription("missed heartbeat") + .build(); private final Clock clock; + private final ScheduledExecutorService scheduledExecutor; + // Serializes all session state mutations. SessionStream.Listener callbacks arrive on Netty I/O + // threads (DirectExecutor) and trampoline through this context before touching session state, + // then dispatch user-facing work to ctx.getExecutor() (the op-level SerializingExecutor). + private final SynchronizationContext sessionSyncContext; private final SessionTracer tracer; private final DebugTagTracer debugTagTracer; private final SessionInfo info; - @GuardedBy("lock") private final SessionStream stream; - @GuardedBy("lock") - private SessionState state = SessionState.NEW; + private volatile SessionState state = SessionState.NEW; - @GuardedBy("lock") - private Instant lastStateChangedAt; + private volatile Instant lastStateChangedAt; + // Set once under lock in start(), then read freely from gRPC callbacks without the lock. + // Safe because start() is always called before any callback fires, so the write is + // visible to all subsequent readers through the happens-before chain from stream.start(). private Listener sessionListener; - private volatile OpenParams openParams; + private OpenParams openParams; - private volatile boolean openParamsUpdated; + private boolean openParamsUpdated; @Nullable private CloseSessionRequest closeReason = null; - @GuardedBy("lock") - private long nextRpcId = 1; + private final AtomicLong nextRpcId = new AtomicLong(1); // TODO: replace with a map when implementing multiplexing - @GuardedBy("lock") private VRpcImpl currentRpc = null; - @GuardedBy("lock") private VRpcResult currentCancel = null; private SessionParametersResponse sessionParameters = DEFAULT_SESSION_PARAMS; - private volatile Duration heartbeatInterval = + + private Duration heartbeatInterval = Duration.ofMillis(Durations.toMillis(sessionParameters.getKeepAlive())); - private volatile Instant nextHeartbeat; + private Instant nextHeartbeat; public SessionImpl( - Metrics metrics, SessionPoolInfo poolInfo, long sessionNum, SessionStream stream) { - this(metrics, Clock.systemUTC(), poolInfo, sessionNum, stream); + Metrics metrics, + SessionPoolInfo poolInfo, + long sessionNum, + SessionStream stream, + ScheduledExecutorService scheduledExecutor) { + this(metrics, Clock.systemUTC(), poolInfo, sessionNum, stream, scheduledExecutor); } SessionImpl( @@ -128,28 +136,35 @@ public SessionImpl( Clock clock, SessionPoolInfo poolInfo, long sessionNum, - SessionStream stream) { + SessionStream stream, + ScheduledExecutorService scheduledExecutor) { this.clock = clock; + this.scheduledExecutor = scheduledExecutor; this.info = SessionInfo.create(poolInfo, sessionNum); this.stream = stream; this.tracer = metrics.newSessionTracer(poolInfo); this.debugTagTracer = metrics.getDebugTagTracer(); this.nextHeartbeat = clock.instant().plus(FUTURE_TIME); this.openParamsUpdated = false; + this.sessionSyncContext = + new SynchronizationContext( + (thread, e) -> + logger.log( + Level.WARNING, + String.format( + "Unhandled exception in session SynchronizationContext for %s", + info.getLogName()), + e)); } @Override public SessionState getState() { - synchronized (lock) { - return state; - } + return state; } @Override public Instant getLastStateChange() { - synchronized (lock) { - return lastStateChangedAt; - } + return lastStateChangedAt; } @Override @@ -169,13 +184,7 @@ public Instant getNextHeartbeat() { @Override public PeerInfo getPeerInfo() { - // This lock might not be necessary, its populated once on a gRPC callback which should - // establish a happens before relationship. However access to the underlying stream is guarded - // with errorprone, so sync block is required to get around the lint. - // TODO: consider removing the sync block - synchronized (lock) { - return stream.getPeerInfo(); - } + return stream.getPeerInfo(); } @Override @@ -185,98 +194,101 @@ public String getLogName() { @Override public void forceClose(CloseSessionRequest closeReason) { - synchronized (lock) { - debugTagTracer.checkPrecondition( - state != SessionState.NEW, - "session_force_close_wrong_state", - "Tried to forceClose an unstarted session %s in state %s", - info.getLogName(), - state); - - if (state == SessionState.CLOSED) { - return; - } - - updateState(SessionState.WAIT_SERVER_CLOSE); - this.closeReason = closeReason; - - // Not sending the CloseSessionRequest because cancel() will just drop it - stream.forceClose(closeReason.getDescription(), null); - // Listeners will be notified by dispatchStreamClosed - } + sessionSyncContext.execute( + () -> { + debugTagTracer.checkPrecondition( + state != SessionState.NEW, + "session_force_close_wrong_state", + "Tried to forceClose an unstarted session %s in state %s", + info.getLogName(), + state); + + if (state == SessionState.CLOSED) { + return; + } + + updateState(SessionState.WAIT_SERVER_CLOSE); + this.closeReason = closeReason; + + // Not sending the CloseSessionRequest because cancel() will just drop it + stream.forceClose(closeReason.getDescription(), null); + // Listeners will be notified by dispatchStreamClosed + }); } @Override public void start(OpenSessionRequest req, Metadata headers, Listener sessionListener) { - synchronized (lock) { - debugTagTracer.checkPrecondition( - state == SessionState.NEW, - "session_start_wrong_state", - "Tried to start a started session, current state: %s", - state); - - logger.fine(String.format("Starting session %s", info.getLogName())); - tracer.onStart(); - - updateState(SessionState.STARTING); - openParams = OpenParams.create(headers, req); - this.sessionListener = sessionListener; - - SessionRequest wrappedReq = SessionRequest.newBuilder().setOpenSession(req).build(); - stream.start( - new SessionStream.Listener() { - @Override - public void onBeforeSessionStart(PeerInfo peerInfo) {} - - @Override - public void onMessage(SessionResponse message) { - dispatchResponseMessage(message); - } - - @Override - public void onClose(Status status, Metadata trailers) { - dispatchStreamClosed(status, trailers); - } - }, - headers); - - stream.sendMessage(wrappedReq); - } + sessionSyncContext.execute( + () -> { + debugTagTracer.checkPrecondition( + state == SessionState.NEW, + "session_start_wrong_state", + "Tried to start a started session, current state: %s", + state); + + logger.fine(String.format("Starting session %s", info.getLogName())); + tracer.onStart(); + + updateState(SessionState.STARTING); + openParams = OpenParams.create(headers, req); + this.sessionListener = sessionListener; + + SessionRequest wrappedReq = SessionRequest.newBuilder().setOpenSession(req).build(); + stream.start( + new SessionStream.Listener() { + @Override + public void onBeforeSessionStart(PeerInfo peerInfo) {} + + @Override + public void onMessage(SessionResponse message) { + sessionSyncContext.execute(() -> dispatchResponseMessage(message)); + } + + @Override + public void onClose(Status status, Metadata trailers) { + sessionSyncContext.execute(() -> dispatchStreamClosed(status, trailers)); + } + }, + headers); + + stream.sendMessage(wrappedReq); + }); } @Override public void close(CloseSessionRequest req) { logger.fine(String.format("Closing session %s for reason: %s", info.getLogName(), req)); - synchronized (lock) { - // Throw an exception because this is a bug and we dont have a listener - debugTagTracer.checkPrecondition( - state != SessionState.NEW, - "session_close_wrong_state", - "Session error: Caller tried to close session %s before starting it with the reason: %s", - info.getLogName(), - req); - - // Multiple close is a no-op - if (state.phase >= SessionState.CLOSING.phase) { - logger.fine( - String.format( - "Session error: Caller tried to close a session %s that is %s for reason: %s", - info.getLogName(), state, req)); - return; - } - - closeReason = req; - updateState(SessionState.CLOSING); - - if (currentRpc == null) { - startGracefulClose(); - } - } + sessionSyncContext.execute( + () -> { + // Throw an exception because this is a bug and we dont have a listener + debugTagTracer.checkPrecondition( + state != SessionState.NEW, + "session_close_wrong_state", + "Session error: Caller tried to close session %s before starting it with the reason:" + + " %s", + info.getLogName(), + req); + + // Multiple close is a no-op + if (state.phase >= SessionState.CLOSING.phase) { + logger.fine( + String.format( + "Session error: Caller tried to close a session %s that is %s for reason: %s", + info.getLogName(), state, req)); + return; + } + + closeReason = req; + updateState(SessionState.CLOSING); + + if (currentRpc == null) { + startGracefulClose(); + } + }); } /** Wraps the flow of closing a session. */ - @GuardedBy("lock") private void startGracefulClose() { debugTagTracer.checkPrecondition( state == SessionState.CLOSING, @@ -316,49 +328,73 @@ VRpc newCall(VRpcDescriptor descriptor) { "session_new_call_wrong_type", "wrong VRpc descriptor type"); - synchronized (lock) { - debugTagTracer.checkPrecondition( - state != SessionState.NEW, - "session_new_call_wrong_state", - "Session error: newCall called before start"); + debugTagTracer.checkPrecondition( + state != SessionState.NEW, + "session_new_call_wrong_state", + "Session error: newCall called before start"); - long rpcId = nextRpcId; - nextRpcId = Math.incrementExact(nextRpcId); - return new VRpcImpl<>(this, descriptor, rpcId, stream.getPeerInfo(), debugTagTracer); - } + long rpcId = nextRpcId.getAndIncrement(); + return new VRpcImpl<>(this, descriptor, rpcId, stream.getPeerInfo(), debugTagTracer); } @Override - public Status startRpc(VRpcImpl rpc, VirtualRpcRequest payload) { - // start monitoring for heartbeat when the vrpc is started - this.nextHeartbeat = clock.instant().plus(heartbeatInterval); - - synchronized (lock) { - if (currentRpc != null) { - return Status.INTERNAL.withDescription( - "Session error: RPC multiplexing is not yet supported"); - } - if (state != SessionState.READY) { - return Status.INTERNAL.withDescription( - "Session error: Session was not ready, state = " + state); - } - - this.currentRpc = rpc; - stream.sendMessage(SessionRequest.newBuilder().setVirtualRpc(payload).build()); - return Status.OK; - } + public void startRpc(VRpcImpl rpc, VirtualRpcRequest payload) { + sessionSyncContext.execute( + () -> { + if (currentRpc != null) { + rpc.handleError( + VRpcResult.createUncommitedError( + Status.INTERNAL.withDescription( + "Session error: RPC multiplexing is not yet supported"))); + return; + } + if (state != SessionState.READY) { + rpc.handleError( + VRpcResult.createUncommitedError( + Status.INTERNAL.withDescription( + "Session error: Session was not ready, state = " + state))); + return; + } + + this.currentRpc = rpc; + stream.sendMessage(SessionRequest.newBuilder().setVirtualRpc(payload).build()); + this.nextHeartbeat = clock.instant().plus(heartbeatInterval); + }); } @Override public void cancelRpc(long rpcId, @Nullable String message, @Nullable Throwable cause) { - synchronized (lock) { - if (currentRpc != null && rpcId == currentRpc.rpcId) { - currentCancel = - VRpcResult.createRejectedError( - Status.CANCELLED.withDescription(message).withCause(cause)); - } - // do nothing if the rpc is already finished + sessionSyncContext.execute( + () -> { + if (currentRpc != null && rpcId == currentRpc.rpcId) { + currentCancel = + VRpcResult.createRejectedError( + Status.CANCELLED.withDescription(message).withCause(cause)); + } + // do nothing if the rpc is already finished + }); + } + + private void scheduleHeartbeatCheck() { + scheduledExecutor.schedule( + () -> sessionSyncContext.execute(this::checkHeartbeat), + HEARTBEAT_CHECK_INTERVAL.toMillis(), + TimeUnit.MILLISECONDS); + } + + // Runs on sessionSyncContext. Checks if the heartbeat deadline has passed and force-closes the + // session if so; otherwise re-schedules itself. + private void checkHeartbeat() { + if (state.phase >= SessionState.WAIT_SERVER_CLOSE.phase) { + return; + } + if (clock.instant().isAfter(nextHeartbeat)) { + logger.warning( + String.format("Missed heartbeat for %s, forcing session close", info.getLogName())); + forceClose(MISSED_HEARTBEAT_CLOSE_REQUEST); + return; } + scheduleHeartbeatCheck(); } // region SessionStream event handlers @@ -394,49 +430,43 @@ private void dispatchResponseMessage(SessionResponse message) { private void handleOpenSessionResponse(OpenSessionResponse openSession) { logger.fine(String.format("%s Session is ready", info.getLogName())); - PeerInfo localPeerInfo; - - synchronized (lock) { - debugTagTracer.checkPrecondition( - state != SessionState.NEW, - "session_open_wrong_state", - "Got session open response before session started"); - debugTagTracer.checkPrecondition( - state != SessionState.CLOSED, - "session_open_wrong_state", - "Got session open response after session was closed"); + debugTagTracer.checkPrecondition( + state != SessionState.NEW, + "session_open_wrong_state", + "Got session open response before session started"); + debugTagTracer.checkPrecondition( + state != SessionState.CLOSED, + "session_open_wrong_state", + "Got session open response after session was closed"); - if (state != SessionState.STARTING) { - logger.fine(String.format("Stream was already %s when session open was received", state)); - return; - } - localPeerInfo = stream.getPeerInfo(); - updateState(SessionState.READY); + if (state != SessionState.STARTING) { + logger.fine(String.format("Stream was already %s when session open was received", state)); + return; } + PeerInfo localPeerInfo = stream.getPeerInfo(); + updateState(SessionState.READY); tracer.onOpen(localPeerInfo); sessionListener.onReady(openSession); + scheduleHeartbeatCheck(); } private void handleSessionParamsResponse(SessionParametersResponse resp) { - synchronized (lock) { - if (state.phase >= SessionState.CLOSING.phase) { - logger.fine( - String.format("Stream was already %s when session params were received", state)); - return; - } + if (state.phase >= SessionState.CLOSING.phase) { + logger.fine(String.format("Stream was already %s when session params were received", state)); + return; + } - if (!sessionParameters.equals(resp)) { - this.sessionParameters = resp; - this.heartbeatInterval = - Duration.ofMillis(Durations.toMillis(sessionParameters.getKeepAlive())); - logger.log( - Level.CONFIG, - () -> - String.format( - "%s session params changed: %s", - info.getLogName(), - TextFormat.printer().emittingSingleLine(true).printToString(resp))); - } + if (!sessionParameters.equals(resp)) { + this.sessionParameters = resp; + this.heartbeatInterval = + Duration.ofMillis(Durations.toMillis(sessionParameters.getKeepAlive())); + logger.log( + Level.CONFIG, + () -> + String.format( + "%s session params changed: %s", + info.getLogName(), + TextFormat.printer().emittingSingleLine(true).printToString(resp))); } } @@ -444,46 +474,40 @@ private void handleVRpcResponse(VirtualRpcResponse vrpc) { // TODO: when stream is supported this should be updated to the next expected time instead of // session life time this.nextHeartbeat = clock.instant().plus(FUTURE_TIME); - VRpcImpl localRpc; - VRpcResult localCancel; - - boolean needsClose; - - synchronized (lock) { - if (state.phase > SessionState.CLOSING.phase) { - debugTagTracer.record( - TelemetryConfiguration.Level.WARN, "session_closed_discard_vrpc_response"); - logger.warning( - String.format( - "%s Discarding vRPC error because session is past the CLOSING phase with the" - + " reason: %s", - info.getLogName(), closeReason)); - return; - } - debugTagTracer.checkPrecondition( - state == SessionState.READY || state == SessionState.CLOSING, - "session_vrpc_response_wrong_state", - "Unexpected vRPC response when session is %s", - state); - debugTagTracer.checkPrecondition( - currentRpc != null, "session_vrpc_null", "Got vRPC response but current vRPC is unset"); - debugTagTracer.checkPrecondition( - currentRpc.rpcId == vrpc.getRpcId(), - "session_vrpc_id_mismatch", - "Got vRPC response for the wrong vRPC: expect: %s, actual: %s", - currentRpc.rpcId, - vrpc.getRpcId()); - - // reset state of the current rpc - localCancel = currentCancel; - currentCancel = null; - localRpc = currentRpc; - // TODO: handle multiplexing - currentRpc = null; - needsClose = (state == SessionState.CLOSING); + if (state.phase > SessionState.CLOSING.phase) { + debugTagTracer.record( + TelemetryConfiguration.Level.WARN, "session_closed_discard_vrpc_response"); + logger.warning( + String.format( + "%s Discarding vRPC error because session is past the CLOSING phase with the" + + " reason: %s", + info.getLogName(), closeReason)); + return; } + debugTagTracer.checkPrecondition( + state == SessionState.READY || state == SessionState.CLOSING, + "session_vrpc_response_wrong_state", + "Unexpected vRPC response when session is %s", + state); + debugTagTracer.checkPrecondition( + currentRpc != null, "session_vrpc_null", "Got vRPC response but current vRPC is unset"); + debugTagTracer.checkPrecondition( + currentRpc.rpcId == vrpc.getRpcId(), + "session_vrpc_id_mismatch", + "Got vRPC response for the wrong vRPC: expect: %s, actual: %s", + currentRpc.rpcId, + vrpc.getRpcId()); + + // reset state of the current rpc + VRpcResult localCancel = currentCancel; + currentCancel = null; + VRpcImpl localRpc = currentRpc; + // TODO: handle multiplexing + currentRpc = null; + boolean needsClose = (state == SessionState.CLOSING); + if (localCancel != null) { tracer.onVRpcClose(localCancel.getStatus().getCode()); localRpc.handleError(localCancel); @@ -491,12 +515,8 @@ private void handleVRpcResponse(VirtualRpcResponse vrpc) { tracer.onVRpcClose(Status.OK.getCode()); localRpc.handleResponse(vrpc); } - if (needsClose) { - synchronized (lock) { - if (state == SessionState.CLOSING) { - startGracefulClose(); - } - } + if (needsClose && state == SessionState.CLOSING) { + startGracefulClose(); } } @@ -505,62 +525,54 @@ private void handleHeartBeatResponse(HeartbeatResponse ignored) { } private void handleSessionRefreshConfigResponse(SessionRefreshConfig config) { - synchronized (lock) { - Metadata grpcMetadata = new Metadata(); - config - .getMetadataList() - .forEach( - entry -> - grpcMetadata.put( - Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER), - entry.getValue().toStringUtf8())); - openParams = OpenParams.create(grpcMetadata, config.getOptimizedOpenRequest()); - openParamsUpdated = true; - } + Metadata grpcMetadata = new Metadata(); + config + .getMetadataList() + .forEach( + entry -> + grpcMetadata.put( + Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER), + entry.getValue().toStringUtf8())); + openParams = OpenParams.create(grpcMetadata, config.getOptimizedOpenRequest()); + openParamsUpdated = true; } private void handleVRpcErrorResponse(ErrorResponse error) { // Skips the heartbeat check when there's no active vrpc on the session this.nextHeartbeat = clock.instant().plus(FUTURE_TIME); - VRpcImpl localRpc; - boolean needsClose; - VRpcResult localCancel; + if (state.phase > SessionState.CLOSING.phase) { + debugTagTracer.record( + TelemetryConfiguration.Level.WARN, "session_closed_discard_vrpc_response"); + logger.warning( + String.format( + "%s Discarding vRPC error because session is past the CLOSING phase with the" + + " reason: %s, error was: %s", + info.getLogName(), closeReason, error)); + return; + } - synchronized (lock) { - if (state.phase > SessionState.CLOSING.phase) { - debugTagTracer.record( - TelemetryConfiguration.Level.WARN, "session_closed_discard_vrpc_response"); - logger.warning( - String.format( - "%s Discarding vRPC error because session is past the CLOSING phase with the" - + " reason: %s, error was: %s", - info.getLogName(), closeReason, error)); - return; - } + debugTagTracer.checkPrecondition( + state == SessionState.READY || state == SessionState.CLOSING, + "session_vrpc_response_wrong_state", + "Unexpected vRPC response when session is %s", + state); - debugTagTracer.checkPrecondition( - state == SessionState.READY || state == SessionState.CLOSING, - "session_vrpc_response_wrong_state", - "Unexpected vRPC response when session is %s", - state); - - debugTagTracer.checkPrecondition( - currentRpc != null, "session_vrpc_null", "Got vRPC response but current vRPC is unset"); - debugTagTracer.checkPrecondition( - currentRpc.rpcId == error.getRpcId(), - "session_vrpc_id_mismatch", - "Got vRPC response for the wrong vRPC: expect: %s, actual: %s", - currentRpc.rpcId, - error.getRpcId()); - - // reset the state of the current rpc - localCancel = currentCancel; - currentCancel = null; - localRpc = currentRpc; - currentRpc = null; - needsClose = (state == SessionState.CLOSING); - } + debugTagTracer.checkPrecondition( + currentRpc != null, "session_vrpc_null", "Got vRPC response but current vRPC is unset"); + debugTagTracer.checkPrecondition( + currentRpc.rpcId == error.getRpcId(), + "session_vrpc_id_mismatch", + "Got vRPC response for the wrong vRPC: expect: %s, actual: %s", + currentRpc.rpcId, + error.getRpcId()); + + // reset the state of the current rpc + VRpcResult localCancel = currentCancel; + currentCancel = null; + VRpcImpl localRpc = currentRpc; + currentRpc = null; + boolean needsClose = (state == SessionState.CLOSING); if (localCancel != null) { tracer.onVRpcClose(localCancel.getStatus().getCode()); @@ -569,42 +581,35 @@ private void handleVRpcErrorResponse(ErrorResponse error) { tracer.onVRpcClose(Status.fromCodeValue(error.getStatus().getCode()).getCode()); localRpc.handleError(VRpcResult.createServerError(error)); } - if (needsClose) { - synchronized (lock) { - if (state == SessionState.CLOSING) { - startGracefulClose(); - } - } + if (needsClose && state == SessionState.CLOSING) { + startGracefulClose(); } } private void handleGoAwayResponse(GoAwayResponse goAwayResponse) { - synchronized (lock) { - if (state.phase >= SessionState.CLOSING.phase) { - debugTagTracer.record(TelemetryConfiguration.Level.WARN, "session_go_away_ignored"); - logger.warning( - String.format( - "Session error: %s Ignoring goaway because session is %s", - info.getLogName(), state)); - return; - } + if (state.phase >= SessionState.CLOSING.phase) { + debugTagTracer.record(TelemetryConfiguration.Level.WARN, "session_go_away_ignored"); + logger.warning( + String.format( + "Session error: %s Ignoring goaway because session is %s", info.getLogName(), state)); + return; + } - debugTagTracer.checkPrecondition( - state.phase >= SessionState.STARTING.phase, - "session_go_away_wrong_state", - "Unexpected goaway when session is %s", - state); + debugTagTracer.checkPrecondition( + state.phase >= SessionState.STARTING.phase, + "session_go_away_wrong_state", + "Unexpected goaway when session is %s", + state); - updateState(SessionState.CLOSING); - closeReason = - CloseSessionRequest.newBuilder() - .setReason(CloseSessionReason.CLOSE_SESSION_REASON_GOAWAY) - .setDescription( - "Server sent GO_AWAY_" + goAwayResponse.getReason().toUpperCase(Locale.ENGLISH)) - .build(); - if (currentRpc == null) { - startGracefulClose(); - } + updateState(SessionState.CLOSING); + closeReason = + CloseSessionRequest.newBuilder() + .setReason(CloseSessionReason.CLOSE_SESSION_REASON_GOAWAY) + .setDescription( + "Server sent GO_AWAY_" + goAwayResponse.getReason().toUpperCase(Locale.ENGLISH)) + .build(); + if (currentRpc == null) { + startGracefulClose(); } sessionListener.onGoAway(goAwayResponse); } @@ -615,50 +620,44 @@ private void handleUnknownResponseMessage(SessionResponse message) { } private void dispatchStreamClosed(Status status, Metadata trailers) { - SessionState prevState; - VRpcImpl localVRpc; + SessionState prevState = state; - PeerInfo localPeerInfo; - synchronized (lock) { - prevState = state; + if (!status.isOk()) { + String augmentedDescription = + Optional.ofNullable(status.getDescription()).map(d -> d + ". ").orElse("") + + "PeerInfo: " + + formatPeerInfo(getPeerInfo()); - if (!status.isOk()) { - String augmentedDescription = - Optional.ofNullable(status.getDescription()).map(d -> d + ". ").orElse("") - + "PeerInfo: " - + formatPeerInfo(getPeerInfo()); + status = status.withDescription(augmentedDescription); + } - status = status.withDescription(augmentedDescription); - } + if (state == SessionState.WAIT_SERVER_CLOSE) { + logger.fine(String.format("%s closed normally with status %s", info.getLogName(), status)); + } else { + debugTagTracer.record(TelemetryConfiguration.Level.WARN, "session_abnormal_close"); + // Unexpected path + String msg = + String.format( + "Session error: %s session closed unexpectedly in state %s. Status: %s", + info.getLogName(), state, status); + logger.warning(msg); - if (state == SessionState.WAIT_SERVER_CLOSE) { - logger.fine(String.format("%s closed normally with status %s", info.getLogName(), status)); - } else { - debugTagTracer.record(TelemetryConfiguration.Level.WARN, "session_abnormal_close"); - // Unexpected path - String msg = - String.format( - "Session error: %s session closed unexpectedly in state %s. Status: %s", - info.getLogName(), state, status); - logger.warning(msg); - - if (state == SessionState.CLOSED) { - return; - } - - closeReason = - CloseSessionRequest.newBuilder() - .setReason(CloseSessionReason.CLOSE_SESSION_REASON_ERROR) - .setDescription("Unexpected session close with status: " + status.getCode()) - .build(); + if (state == SessionState.CLOSED) { + return; } - localVRpc = currentRpc; - localPeerInfo = stream.getPeerInfo(); - currentRpc = null; - updateState(SessionState.CLOSED); + closeReason = + CloseSessionRequest.newBuilder() + .setReason(CloseSessionReason.CLOSE_SESSION_REASON_ERROR) + .setDescription("Unexpected session close with status: " + status.getCode()) + .build(); } + VRpcImpl localVRpc = currentRpc; + PeerInfo localPeerInfo = stream.getPeerInfo(); + currentRpc = null; + updateState(SessionState.CLOSED); + if (localVRpc != null) { try { localVRpc.handleSessionClose(VRpcResult.createRemoteTransportError(status, trailers)); @@ -677,7 +676,6 @@ private void dispatchStreamClosed(Status status, Metadata trailers) { sessionListener.onClose(prevState, status, trailers); } - @GuardedBy("lock") private void updateState(SessionState newState) { this.state = newState; this.lastStateChangedAt = clock.instant(); diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java index cba048fe0ce0..7c133c963441 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java @@ -16,15 +16,12 @@ package com.google.cloud.bigtable.data.v2.internal.session; -import static com.google.bigtable.v2.CloseSessionRequest.CloseSessionReason.CLOSE_SESSION_REASON_MISSED_HEARTBEAT; - import com.google.auto.value.AutoValue; import com.google.bigtable.v2.CloseSessionRequest; import com.google.bigtable.v2.PeerInfo; import com.google.cloud.bigtable.data.v2.internal.middleware.VRpc.VRpcResult; import com.google.cloud.bigtable.data.v2.internal.session.Session.SessionState; import com.google.common.annotations.VisibleForTesting; -import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; @@ -69,12 +66,6 @@ class SessionList { private final Set allSessions = new HashSet<>(); private final Set inUseSessions = new HashSet<>(); - private final CloseSessionRequest missedHeartbeatCloseRequest = - CloseSessionRequest.newBuilder() - .setReason(CLOSE_SESSION_REASON_MISSED_HEARTBEAT) - .setDescription("missed heartbeat") - .build(); - // pool level statistics across all the afes private final PoolStats poolStats = new PoolStats(); @@ -145,20 +136,6 @@ void prune() { } } - void checkHeartbeat(Clock clock) { - Instant now = clock.instant(); - inUseSessions.forEach( - handle -> { - if (now.isAfter(handle.getSession().getNextHeartbeat())) { - LOG.log( - Level.WARNING, - "Missed heartbeat for {0}, forcing session close", - handle.getSession().getLogName()); - handle.getSession().forceClose(missedHeartbeatCloseRequest); - } - }); - } - @NotThreadSafe class SessionHandle { private final Session session; diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java index 35884cb74319..47d856b5bd51 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java @@ -124,8 +124,6 @@ private enum PoolState { private final ScheduledFuture afeListPruneTask; - private final ScheduledFuture heartbeatMonitor; - private final ScheduledExecutorService executorService; @GuardedBy("this") @@ -140,6 +138,10 @@ private enum PoolState { private final DebugTagTracer debugTagTracer; + // @SuppressWarnings("GuardedBy"): error-prone flags writes to @GuardedBy("this") fields + // (sessions, picker, poolSizer, pendingRpcs, budget, retryCreateSessionFuture) inside the + // constructor without holding the monitor. This is safe because the object is not yet published + // to other threads — no external reference exists until the constructor returns. @SuppressWarnings("GuardedBy") public SessionPoolImpl( Metrics metrics, @@ -164,6 +166,7 @@ public SessionPoolImpl( createInitialBudget(configManager.getClientConfiguration())); } + // @SuppressWarnings("GuardedBy"): same rationale as the public constructor above. @SuppressWarnings("GuardedBy") @VisibleForTesting SessionPoolImpl( @@ -206,18 +209,7 @@ public SessionPoolImpl( // Watchdog checks for sessions in WAIT_SERVER_CLOSE state and runs every 5 minutes watchdog = new Watchdog(this, executorService, Duration.ofMinutes(5), sessions, debugTagTracer); - // Heartbeat monitor checks for sessions in READY state with active vRPCs and runs more - // frequently - heartbeatMonitor = - executorService.scheduleAtFixedRate( - () -> { - synchronized (SessionPoolImpl.this) { - sessions.checkHeartbeat(Clock.systemUTC()); - } - }, - SessionImpl.HEARTBEAT_CHECK_INTERVAL.toMillis(), - SessionImpl.HEARTBEAT_CHECK_INTERVAL.toMillis(), - TimeUnit.MILLISECONDS); + // Heartbeat monitoring is now done per-session via SessionImpl.sessionSyncContext. afeListPruneTask = executorService.scheduleAtFixedRate( () -> { @@ -253,6 +245,7 @@ public SessionPoolInfo getInfo() { public void close(CloseSessionRequest req) { configListenerHandle.close(); + List> toCancel; synchronized (this) { if (poolState == PoolState.CLOSED) { logger.fine(String.format("Tried to close a closed SessionPool %s", info.getLogName())); @@ -262,11 +255,9 @@ public void close(CloseSessionRequest req) { poolState = PoolState.CLOSED; - for (PendingVRpc pendingRpc : pendingRpcs) { - pendingRpc.cancel("SessionPool closed: " + req, null); - } + toCancel = new ArrayList<>(pendingRpcs); + pendingRpcs.clear(); afeListPruneTask.cancel(false); - heartbeatMonitor.cancel(false); if (retryCreateSessionFuture != null) { retryCreateSessionFuture.cancel(false); retryCreateSessionFuture = null; @@ -274,6 +265,10 @@ public void close(CloseSessionRequest req) { watchdog.close(); sessions.close(req); } + + for (PendingVRpc pendingRpc : toCancel) { + pendingRpc.cancel("SessionPool closed: " + req, null); + } } @Override @@ -345,7 +340,7 @@ private synchronized void createSession(OpenParams openParams) { try (Scope ignored = io.opentelemetry.context.Context.root().makeCurrent()) { SessionStream stream = factory.createNew(); - Session session = new SessionImpl(metrics, info, sessionNum++, stream); + Session session = new SessionImpl(metrics, info, sessionNum++, stream, executorService); SessionHandle handle = sessions.newHandle(session); Metadata localMd = new Metadata(); @@ -515,7 +510,7 @@ private void onSessionClose( status, trailers))); for (PendingVRpc vrpc : toBeClosed) { try { - vrpc.getListener().onClose(result); + vrpc.cancelWithResult(result); } catch (Throwable t) { logger.log(Level.WARNING, "Exception when closing request", t); } @@ -526,10 +521,6 @@ private void onSessionClose( @GuardedBy("this") private void tryDrainPendingRpcs() { while (!pendingRpcs.isEmpty()) { - if (pendingRpcs.peek().isCancelled) { - pendingRpcs.pop(); - continue; - } Optional handle = picker.pickSession(); if (!handle.isPresent()) { break; @@ -545,11 +536,8 @@ private void tryDrainPendingRpcs() { Iterator> iter = pendingRpcs.iterator(); while (iter.hasNext()) { PendingVRpc vrpc = iter.next(); - // vrpcs that have started on a session gets closed in SessionImpl. Do not double close. - if (!vrpc.isCancelled && vrpc.realCall == null) { - iter.remove(); - toBeClosed.add(vrpc); - } + iter.remove(); + toBeClosed.add(vrpc); } return toBeClosed; } @@ -570,7 +558,6 @@ public synchronized VRpc(desc); } - @GuardedBy("this") private VRpc newRealCall( VRpcDescriptor desc, SessionHandle handle) { @@ -635,10 +622,13 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { synchronized (SessionPoolImpl.this) { if (SessionPoolImpl.this.poolState != PoolState.STARTED) { - listener.onClose( - VRpcResult.createUncommitedError( - Status.UNAVAILABLE.withCause( - new IllegalStateException("SessionPool is closed")))); + ctx.getExecutor() + .execute( + () -> + listener.onClose( + VRpcResult.createUncommitedError( + Status.UNAVAILABLE.withCause( + new IllegalStateException("SessionPool is closed"))))); return; } pendingRpcs.add(this); @@ -660,9 +650,6 @@ public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { } } - // It's safe to call cancel on a vrpc more than once. It'll be a noop after the initial - // call. Cancelled vrpcs are removed from the pending vrpc queue the next time we - // drain the queue. @Override public void cancel(@Nullable String message, @Nullable Throwable cause) { Status status = Status.CANCELLED; @@ -675,30 +662,31 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) { cancel(status, false); } - // Cancel could race with drainTo which sets the real call. Assign realCall to a NOOP_CALL - // so if drainTo gets called at the same time, it'll just get swallowed and we're only calling - // onClose once on the listener. The cancel could also come from deadline monitor when - // the deadline expires. In this case if the real call is already set, we want to real call - // to handle the deadline and return early. + // cancel() and drainTo() are sequenced via ctx.getExecutor() (a per-op SerializingExecutor), + // so isCancelled and realCall are owned exclusively by that executor — no pool lock needed. private void cancel(Status status, boolean onlyCancelPendingCall) { - boolean delegateToRealCall = true; synchronized (SessionPoolImpl.this) { - if (isCancelled) { - return; - } + pendingRpcs.remove(this); // eager removal; no-op if already drained + } + ctx.getExecutor().execute(() -> { + if (isCancelled) return; isCancelled = true; - if (realCall == null) { - this.realCall = NOOP_CALL; - delegateToRealCall = false; - } else if (onlyCancelPendingCall) { - return; + if (realCall != null) { + if (!onlyCancelPendingCall) { + realCall.cancel(status.getDescription(), status.getCause()); + } + } else { + listener.onClose(VRpcResult.createRejectedError(status)); } - } - if (delegateToRealCall) { - realCall.cancel(status.getDescription(), status.getCause()); - } else { - listener.onClose(VRpcResult.createRejectedError(status)); - } + }); + } + + void cancelWithResult(VRpcResult result) { + ctx.getExecutor().execute(() -> { + if (isCancelled) return; + isCancelled = true; + listener.onClose(result); + }); } @Override @@ -711,15 +699,18 @@ public void requestNext() { } private void drainTo(SessionHandle handle) { - synchronized (SessionPoolImpl.this) { - if (realCall == null) { - this.realCall = newRealCall(desc, handle); - } - } - this.realCall.start(req, ctx, listener); if (deadlineMonitor != null) { deadlineMonitor.cancel(false); } + ctx.getExecutor().execute(() -> { + if (isCancelled) { + SessionPoolImpl.this.onVRpcComplete( + handle, Duration.ZERO, VRpcResult.createRejectedError(Status.CANCELLED)); + return; + } + realCall = newRealCall(desc, handle); + realCall.start(req, ctx, listener); + }); } private VRpcListener getListener() { @@ -751,7 +742,10 @@ static class Watchdog implements Runnable { private final Clock clock; private final DebugTagTracer debugTagTracer; - // TODO: fix lock sharing + // The `lock` parameter is the pool-wide monitor (SessionPoolImpl.this). It is typed as Object + // because Watchdog is a static nested class and cannot reference the outer instance type in its + // constructor signature without creating a circular dependency. Phase 5 will replace this with + // a properly typed lock once the per-AFE sharding model is established. public Watchdog( Object lock, ScheduledExecutorService executor, @@ -831,15 +825,4 @@ public void close() { } } - private static final VRpc NOOP_CALL = - new VRpc() { - @Override - public void start(Object req, VRpcCallContext ctx, VRpcListener listener) {} - - @Override - public void cancel(@Nullable String message, @Nullable Throwable cause) {} - - @Override - public void requestNext() {} - }; } diff --git a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java index a2d841bd1a68..42088d1a96d3 100644 --- a/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java +++ b/java-bigtable/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/VRpcImpl.java @@ -54,7 +54,7 @@ class VRpcImpl rpc, VirtualRpcRequest payload); + void startRpc(VRpcImpl rpc, VirtualRpcRequest payload); void cancelRpc(long rpcId, @Nullable String message, @Nullable Throwable cause); } @@ -69,6 +69,8 @@ private enum State { private final VRpcDescriptor desc; final long rpcId; private VRpcListener listener; + // Stored in start() so that handle*() methods can dispatch to the op executor in Phase 4. + private VRpcCallContext ctx; private PeerInfo peerInfo; private AtomicReference state; @@ -92,54 +94,44 @@ public VRpcImpl( @Override public void start(ReqT req, VRpcCallContext ctx, VRpcListener listener) { this.listener = listener; - - Status status; - boolean retryable = true; + this.ctx = ctx; if (!state.compareAndSet(State.NEW, State.STARTED)) { - status = Status.INTERNAL.withDescription("VRpc already started in state: " + state.get()); - retryable = false; - } else if (ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.MICROSECONDS) - < TimeUnit.MILLISECONDS.toMicros(1)) { - // Don't send RPCs that don't have any hope of succeeding - status = - Status.DEADLINE_EXCEEDED.withDescription("Remaining deadline is too short to send RPC"); - retryable = false; - } else { - Metadata vRpcMetadata = - Metadata.newBuilder() - .setAttemptNumber(ctx.getOperationInfo().getAttemptNumber()) - .setTraceparent(ctx.getTraceParent()) - .build(); - ctx.getTracer().onRequestSent(peerInfo); - status = - session.startRpc( - this, - VirtualRpcRequest.newBuilder() - .setRpcId(rpcId) - .setMetadata(vRpcMetadata) - .setDeadline( - Durations.fromNanos( - ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.NANOSECONDS))) - .setPayload(desc.encode(req)) - .build()); - // if status is not OK, the session might not be ready and the vRPC can be retried on a - // different session + VRpcResult result = + VRpcResult.createRejectedError( + Status.INTERNAL.withDescription("VRpc already started in state: " + state.get())); + ctx.getExecutor().execute(() -> listener.onClose(result)); + return; } - if (!status.isOk()) { - debugTagTracer.checkPrecondition( - state.compareAndSet(State.STARTED, State.CLOSED), - "vrpc_incorrect_start_state", - "VRpc has incorrect state. Expected to be started but was %s", - state); - // TODO: loop through the session executor - if (retryable) { - listener.onClose(VRpcResult.createUncommitedError(status)); - } else { - listener.onClose(VRpcResult.createRejectedError(status)); - } + if (ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.MICROSECONDS) + < TimeUnit.MILLISECONDS.toMicros(1)) { + state.set(State.CLOSED); + VRpcResult result = + VRpcResult.createRejectedError( + Status.DEADLINE_EXCEEDED.withDescription( + "Remaining deadline is too short to send RPC")); + ctx.getExecutor().execute(() -> listener.onClose(result)); + return; } + + Metadata vRpcMetadata = + Metadata.newBuilder() + .setAttemptNumber(ctx.getOperationInfo().getAttemptNumber()) + .setTraceparent(ctx.getTraceParent()) + .build(); + ctx.getTracer().onRequestSent(peerInfo); + session.startRpc( + this, + VirtualRpcRequest.newBuilder() + .setRpcId(rpcId) + .setMetadata(vRpcMetadata) + .setDeadline( + Durations.fromNanos( + ctx.getOperationInfo().getDeadline().timeRemaining(TimeUnit.NANOSECONDS))) + .setPayload(desc.encode(req)) + .build()); + // Session delivers startRpc errors asynchronously via handleError() on ctx.getExecutor() } void handleSessionClose(VRpcResult result) { @@ -147,8 +139,7 @@ void handleSessionClose(VRpcResult result) { logger.warning("tried to close a vRPC after it was already closed state: " + state.get()); return; } - - listener.onClose(result); + ctx.getExecutor().execute(() -> listener.onClose(result)); } void handleResponse(VirtualRpcResponse response) { @@ -167,30 +158,32 @@ void handleResponse(VirtualRpcResponse response) { // Right now, vrpc streaming & cancellation is not supported, so notifying SessionImpl is // unnecessary. In the future handleResponse will need to notify that Session that the user // was already notified of the error and no further notifications should be delivered - VRpcResult result = + VRpcResult decodeError = VRpcResult.createLocalTransportError( Status.INTERNAL.withDescription("Failed to decode VRpc payload").withCause(e)); - listener.onClose(result); - return; - } - - try { - listener.onMessage(resp); - } catch (Throwable e) { - VRpcResult result = VRpcResult.createUserError(e); - listener.onClose(result); + ctx.getExecutor().execute(() -> listener.onClose(decodeError)); return; } - listener.onClose(VRpcResult.createServerOk(response)); + RespT finalResp = resp; + ctx.getExecutor() + .execute( + () -> { + try { + listener.onMessage(finalResp); + } catch (Throwable e) { + listener.onClose(VRpcResult.createUserError(e)); + return; + } + listener.onClose(VRpcResult.createServerOk(response)); + }); } void handleError(VRpcResult result) { if (state.getAndSet(State.CLOSED) == State.CLOSED) { return; } - - listener.onClose(result); + ctx.getExecutor().execute(() -> listener.onClose(result)); } @Override diff --git a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/ClientTest.java b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/ClientTest.java index 7a92e5eb608a..a0e4a26e2824 100644 --- a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/ClientTest.java +++ b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/ClientTest.java @@ -55,7 +55,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +@Timeout(30) public class ClientTest { private ClientConfiguration defaultConfig; diff --git a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/TableBaseTest.java b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/TableBaseTest.java index 54406abe51a8..f1fc2cf9766b 100644 --- a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/TableBaseTest.java +++ b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/api/TableBaseTest.java @@ -31,6 +31,7 @@ import com.google.cloud.bigtable.data.v2.internal.session.SessionPoolInfo; import com.google.cloud.bigtable.data.v2.internal.session.VRpcDescriptor; import com.google.protobuf.Message; +import com.google.common.util.concurrent.MoreExecutors; import io.grpc.Deadline; import io.grpc.Metadata; import java.util.concurrent.ScheduledExecutorService; @@ -38,10 +39,12 @@ import javax.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +@Timeout(30) @ExtendWith(MockitoExtension.class) public class TableBaseTest { @@ -70,7 +73,8 @@ public void setup() { VRpcDescriptor.READ_ROW, VRpcDescriptor.MUTATE_ROW, noopMetrics, - mockExecutor); + mockExecutor, + MoreExecutors.directExecutor()); deadline = Deadline.after(1, TimeUnit.MINUTES); f = new UnaryResponseFuture<>(); } diff --git a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/VRpcTracerTest.java b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/VRpcTracerTest.java index 62a802dfeb0d..a32cf4e61167 100644 --- a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/VRpcTracerTest.java +++ b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/csm/tracers/VRpcTracerTest.java @@ -67,6 +67,7 @@ import io.grpc.InsecureChannelCredentials; import io.grpc.Metadata; import io.grpc.Server; +import io.grpc.SynchronizationContext; import io.opencensus.stats.Stats; import io.opencensus.tags.Tags; import io.opentelemetry.api.common.Attributes; @@ -91,7 +92,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +@Timeout(30) public class VRpcTracerTest { private static final Correspondence METRIC_DATA_BY_NAME = Correspondence.transforming(MetricData::getName, "MetricData name"); @@ -157,7 +160,7 @@ void setUp() throws IOException { SessionFactory sessionFactory = new SessionFactory( channelPool, FakeDescriptor.FAKE_SESSION.getMethodDescriptor(), CallOptions.DEFAULT); - session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); } @AfterEach @@ -198,8 +201,10 @@ public void operationLatencyTest() throws Exception { // Test CompletableFuture opFinished = new CompletableFuture<>(); Stopwatch stopwatch = Stopwatch.createStarted(); + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); RetryingVRpc retrying = - new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor); + new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext); UnaryResponseFuture userFuture = new UnaryResponseFuture<>(); MethodInfo methodInfo = MethodInfo.builder().setName("Bigtable.ReadRow").setStreaming(false).build(); @@ -217,7 +222,7 @@ public void onOperationFinish(VRpc.VRpcResult result) { }; retrying.start( SessionFakeScriptedRequest.newBuilder().setTag(0).build(), - VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), + VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer, syncContext), userFuture); SessionFakeScriptedResponse response = userFuture.get(); assertThat(response).isEqualToDefaultInstance(); @@ -256,9 +261,11 @@ public void attemptLatencyTest() throws Exception { // Test Stopwatch stopwatch = Stopwatch.createStarted(); AtomicLong maxAttemptLatency = new AtomicLong(); + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); DelayedVRpc delayedVRpc = new DelayedVRpc<>( - () -> new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor)); + () -> new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext)); UnaryResponseFuture userFuture = new UnaryResponseFuture<>(); MethodInfo methodInfo = MethodInfo.builder().setName("Bigtable.ReadRow").setStreaming(false).build(); @@ -275,7 +282,7 @@ public void onAttemptFinish(VRpc.VRpcResult result) { }; delayedVRpc.start( SessionFakeScriptedRequest.newBuilder().setTag(0).build(), - VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), + VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer, syncContext), userFuture); long sessionDelay = 5L; @@ -315,8 +322,10 @@ public void retryCountTest() throws Exception { sessionListener.popUntil(OpenSessionResponse.class); // Test + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); RetryingVRpc retrying = - new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor); + new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext); UnaryResponseFuture f = new UnaryResponseFuture<>(); CompletableFuture opFinished = new CompletableFuture<>(); MethodInfo methodInfo = @@ -332,7 +341,7 @@ public void onOperationFinish(VRpc.VRpcResult result) { }; retrying.start( SessionFakeScriptedRequest.newBuilder().setTag(0).build(), - VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), + VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer, syncContext), f); SessionFakeScriptedResponse response = f.get(); assertThat(response).isEqualToDefaultInstance(); @@ -365,9 +374,13 @@ public void clientBlockingLatencySessionDelayTest() throws Exception { session.start(openSessionRequest, new Metadata(), sessionListener); // Test + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); DelayedVRpc delayedVRpc = new DelayedVRpc<>( - () -> new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor)); + () -> + new RetryingVRpc<>( + () -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext)); UnaryResponseFuture f = new UnaryResponseFuture<>(); CompletableFuture attemptFinished = new CompletableFuture<>(); MethodInfo methodInfo = @@ -386,7 +399,7 @@ public void onAttemptFinish(VRpc.VRpcResult result) { delayedVRpc.start( SessionFakeScriptedRequest.newBuilder().setTag(0).build(), - VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), + VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer, syncContext), f); long sessionDelay = 200; Thread.sleep(sessionDelay); diff --git a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpcTest.java b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpcTest.java index b9af925d4871..6e96685a05fe 100644 --- a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpcTest.java +++ b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/middleware/RetryingVRpcTest.java @@ -58,6 +58,7 @@ import io.grpc.Metadata; import io.grpc.Server; import io.grpc.Status; +import io.grpc.SynchronizationContext; import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -66,11 +67,13 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +@Timeout(30) @ExtendWith(MockitoExtension.class) public class RetryingVRpcTest { private ScheduledExecutorService executor; @@ -121,7 +124,7 @@ void tearDown() { @Test void noRetryTest() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); OpenSessionRequest openSessionRequest = @@ -132,12 +135,14 @@ void noRetryTest() throws Exception { assertThat(sessionListener.popUntil(OpenSessionResponse.class)) .isInstanceOf(OpenSessionResponse.class); + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); RetryingVRpc retrying = - new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor); + new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext); UnaryResponseFuture f = new UnaryResponseFuture<>(); retrying.start( SessionFakeScriptedRequest.newBuilder().setTag(0).build(), - VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), + VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer, syncContext), f); assertThat(f.get()).isEqualTo(SessionFakeScriptedResponse.getDefaultInstance()); @@ -152,7 +157,7 @@ void noRetryTest() throws Exception { @Test public void retryServerError() throws Exception { int requestTag = 1; - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); @@ -198,12 +203,14 @@ public void retryServerError() throws Exception { assertThat(sessionListener.popUntil(OpenSessionResponse.class)) .isInstanceOf(OpenSessionResponse.class); + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); RetryingVRpc retrying = - new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor); + new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext); UnaryResponseFuture f = new UnaryResponseFuture<>(); retrying.start( SessionFakeScriptedRequest.newBuilder().setTag(requestTag).build(), - VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), + VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer, syncContext), f); assertThat(f.get()).isEqualTo(SessionFakeScriptedResponse.getDefaultInstance()); @@ -218,7 +225,7 @@ public void retryServerError() throws Exception { @Test public void retryDeadlineRespectedTest() throws Exception { int requestTag = 1; - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); @@ -273,13 +280,18 @@ public void retryDeadlineRespectedTest() throws Exception { assertThat(sessionListener.popUntil(OpenSessionResponse.class)) .isInstanceOf(OpenSessionResponse.class); + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); RetryingVRpc retrying = - new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor); + new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext); UnaryResponseFuture f = new UnaryResponseFuture<>(); retrying.start( SessionFakeScriptedRequest.newBuilder().setTag(requestTag).build(), VRpc.VRpcCallContext.create( - Deadline.after(Durations.toMillis(deadline), TimeUnit.MILLISECONDS), true, tracer), + Deadline.after(Durations.toMillis(deadline), TimeUnit.MILLISECONDS), + true, + tracer, + syncContext), f); ExecutionException exception = assertThrows(ExecutionException.class, f::get); assertThat(exception).hasMessageThat().contains(errorMessage); @@ -295,7 +307,7 @@ public void retryDeadlineRespectedTest() throws Exception { @Test public void vRpcFailureTest() throws Exception { // vrpc error on the session should not close the stream - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); @@ -332,12 +344,14 @@ public void vRpcFailureTest() throws Exception { assertThat(sessionListener.popUntil(OpenSessionResponse.class)) .isInstanceOf(OpenSessionResponse.class); + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); RetryingVRpc retrying = - new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor); + new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext); UnaryResponseFuture f = new UnaryResponseFuture<>(); retrying.start( SessionFakeScriptedRequest.newBuilder().setTag(0).build(), - VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), + VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer, syncContext), f); VRpcException cause = (VRpcException) assertThrows(ExecutionException.class, () -> f.get()).getCause(); @@ -353,7 +367,7 @@ public void vRpcFailureTest() throws Exception { @Test void cancelInScheduledState() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); @@ -391,12 +405,14 @@ void cancelInScheduledState() throws Exception { assertThat(sessionListener.popUntil(OpenSessionResponse.class)) .isInstanceOf(OpenSessionResponse.class); + SynchronizationContext syncContext = + new SynchronizationContext((t, e) -> { throw new AssertionError("Unexpected exception", e); }); RetryingVRpc retrying = - new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor); + new RetryingVRpc<>(() -> session.newCall(FakeDescriptor.SCRIPTED), executor, syncContext); UnaryResponseFuture f = new UnaryResponseFuture<>(); retrying.start( SessionFakeScriptedRequest.newBuilder().setTag(1).build(), - VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), + VRpc.VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer, syncContext), f); // Wait for the first attempt to fail and get scheduled for retry diff --git a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImplTest.java b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImplTest.java index 838cea0e2ffe..8ba96c607892 100644 --- a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImplTest.java +++ b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionImplTest.java @@ -77,11 +77,13 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +@Timeout(30) @ExtendWith(MockitoExtension.class) public class SessionImplTest { private ScheduledExecutorService executor; @@ -133,7 +135,7 @@ void tearDown() { @Test void sessionSendAndCloseTest() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); OpenSessionRequest openSessionRequest = @@ -163,7 +165,7 @@ void sessionSendAndCloseTest() throws Exception { @Test void sessionCloseBeforeInit() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); OpenSessionRequest openSessionRequest = @@ -180,7 +182,7 @@ void sessionCloseBeforeInit() throws Exception { @Test void sessionGoAwayTest() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); Duration goAwayDelay = Duration.ofMillis(500); FakeSessionListener sessionListener = new FakeSessionListener(); @@ -268,7 +270,7 @@ void sessionGoAwayTest() throws Exception { @Test void streamErrorDuringRpcTest() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); Status.Code actualCode = Status.Code.INTERNAL; @@ -337,7 +339,7 @@ void streamErrorDuringRpcTest() throws Exception { @Test void rpcErrorDuringRpcTest() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); com.google.rpc.Status expectedRpcStatus = com.google.rpc.Status.newBuilder() @@ -404,7 +406,7 @@ void rpcErrorDuringRpcTest() throws Exception { @Test void localErrorTest() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); FakeSessionListener sessionListener = new FakeSessionListener(); session.start( @@ -451,7 +453,7 @@ void testHeartbeat() throws Exception { Instant time = clock.instant(); - SessionImpl session = new SessionImpl(metrics, clock, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, clock, poolInfo, 0, sessionFactory.createNew(), executor); int keepAliveDurationMs = 150; @@ -490,8 +492,14 @@ void testHeartbeat() throws Exception { VRpcCallContext.create(Deadline.after(1, TimeUnit.MINUTES), true, tracer), f); - assertThat(session.getNextHeartbeat()) - .isEqualTo(time.plus(Duration.ofMillis(keepAliveDurationMs))); + // startRpc() is now async; poll until sessionSyncContext processes it + Instant expectedHeartbeat = time.plus(Duration.ofMillis(keepAliveDurationMs)); + Stopwatch sw = Stopwatch.createStarted(); + while (!session.getNextHeartbeat().equals(expectedHeartbeat) + && sw.elapsed(TimeUnit.SECONDS) < 5) { + Thread.sleep(10); + } + assertThat(session.getNextHeartbeat()).isEqualTo(expectedHeartbeat); assertThat(f.get()).isEqualTo(SessionFakeScriptedResponse.getDefaultInstance()); @@ -507,7 +515,7 @@ void testHeartbeat() throws Exception { @Test void testCancel() throws Exception { - SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew()); + SessionImpl session = new SessionImpl(metrics, poolInfo, 0, sessionFactory.createNew(), executor); int responseDelayMs = 200; // Configure the fake service to delay the response, giving us time to cancel it diff --git a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImplTest.java b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImplTest.java index ea142d8a7449..79f7be193ee1 100644 --- a/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImplTest.java +++ b/java-bigtable/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImplTest.java @@ -89,6 +89,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; import org.mockito.ArgumentCaptor; @@ -96,6 +97,7 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +@Timeout(30) @Nested @ExtendWith(MockitoExtension.class) public class SessionPoolImplTest {