Skip to content

feat(bigquery-jdbc): implement connection scoped thread pools#13380

Open
Neenu1995 wants to merge 1 commit into
mainfrom
conn-level-thread-pool-2
Open

feat(bigquery-jdbc): implement connection scoped thread pools#13380
Neenu1995 wants to merge 1 commit into
mainfrom
conn-level-thread-pool-2

Conversation

@Neenu1995
Copy link
Copy Markdown
Contributor

@Neenu1995 Neenu1995 commented Jun 5, 2026

This PR refactors the threading model of the BigQuery JDBC driver to achieve per-connection execution isolation, proper logging context propagation, and reliable resource cleanup. It removes the shared static thread executor and transient pools, replacing them with a connection-scoped execution service and standard JVM concurrency primitives.

Key Changes

1. Per-Connection Thread Isolation

  • Added a dynamically sized, connection-scoped ExecutorService instance (backed by a custom MdcThreadPoolExecutor) to BigQueryConnection.
  • Sizing of the thread pool is controlled by the user-defined metadataFetchThreadCount connection property.
  • Statements and ResultSets running on a connection now submit all queries, pagination, and queue-population tasks to the connection's dedicated executor, eliminating thread contamination across connections.
  • Automatically shuts down and awaits termination of the connection executor during Connection.close().

2. Database Metadata Concurrency Hardening

  • Migrated all background metadata queries in BigQueryDatabaseMetaData (e.g., procedures, tables, columns, functions) to run tasks using the connection's executor pool rather than spawning transient thread pools.
  • Standardized task tracking by wrapping background runnable fetch loops in standard FutureTask<?> instances.
  • Removed the custom ThreadFuture adapter bridge, aligning metadata background tasks directly with the Future<?> representation required by BigQueryJsonResultSet.

3. MDC Context Propagation and Logging

  • Integrated a customized thread pool executor (MdcThreadPoolExecutor) that intercepts tasks and wraps them in MdcFutureTask.
  • Automatically propagates the active connectionId MDC logging context from calling threads to background worker threads, ensuring parser and driver logging messages are correctly routed to connection-specific log files.

4. ResultSet Finalization Cleanups

  • Refactored BigQueryResultSetFinalizers (ArrowResultSetFinalizer and JsonResultSetFinalizer) to track background worker tasks as Future<?> rather than raw Thread objects.
  • Updated GC-polling daemon triggers to call Future.cancel(true) instead of Thread.interrupt() during garbage collection cleanup.

Verification

  • Added a new unit test suite BigQueryJdbcMdcTest to validate pool isolation, dynamic sizing, and MDC context propagation.
  • Verified compilation and executed the unit test suite; all 990 tests passed successfully.

@Neenu1995 Neenu1995 requested review from a team as code owners June 5, 2026 21:40
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors thread management in the BigQuery JDBC driver by replacing unmanaged threads and ad-hoc thread pools with a connection-scoped ExecutorService and utilizing Future and FutureTask for task tracking and cancellation. Feedback on these changes highlights three key issues: first, sharing a single executor service for both metadata fetching and query execution background tasks risks thread starvation deadlocks and performance degradation; second, using a modulo check (queueSize % 10 == 0) for queue warnings in BigQueryJdbcMdc may lead to log flooding; and third, the executor shutdown sequence in BigQueryConnection needs to be more robust to handle InterruptedException properly and prevent resource leaks.

this.connectionId = connection.getConnectionId();
this.bigQuery = connection.getBigQuery();
this.querySettings = generateBigQuerySettings();
this.queryTaskExecutor = connection.getExecutorService();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Sharing the connection's executorService (which is sized based on metadataFetchThreadCount) for both metadata fetching and query execution background tasks (queryTaskExecutor) introduces a risk of Thread Starvation Deadlocks and performance interference.

Why this is an issue:

  1. Deadlock Risk: Query execution background processing (e.g., in processJsonResultSet) concurrently submits multiple tasks to queryTaskExecutor (such as nextPageWorker and populateBufferWorker) which communicate via blocking queues. If metadataFetchThreadCount is configured to a low value (e.g., 1 or 2), or if the pool is saturated by concurrent metadata operations, these tasks can easily starve each other, leading to a deadlock where one task blocks on a queue while the other cannot be scheduled to consume from it.
  2. Resource Contention: Heavy metadata operations (like getColumns or getTables which scan multiple datasets in parallel) can saturate the shared pool, causing query execution background tasks to experience significant scheduling delays.
  3. Confusing Configuration: The metadataFetchThreadCount property is conceptually intended to control the parallelism of metadata retrieval, but it now implicitly caps the throughput of query execution background processing.

Recommendation:

Consider keeping a separate, dedicated bounded ExecutorService for query execution tasks (e.g., a bounded thread pool with a safe default size and defined maximum size), or ensure that the connection-scoped executor has a guaranteed minimum size and separate queues/pools to prevent starvation. Avoid using unbounded thread pools like Executors.newCachedThreadPool() to prevent resource exhaustion.

References
  1. For safety, use a bounded thread pool (e.g., ThreadPoolExecutor with a defined maximum size) instead of an unbounded one (e.g., Executors.newCachedThreadPool()), even if the current logic seems to limit concurrent tasks.

Comment on lines +113 to +119
int queueSize = getQueue().size();
if (queueSize > 0 && queueSize % 10 == 0) {
LOG.warning(
"Thread pool queue size reached "
+ queueSize
+ ". Tasks are waiting for threads to free up.");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Checking queueSize % 10 == 0 to trigger a warning can lead to severe log flooding in high-throughput scenarios.

If the thread pool queue size fluctuates frequently around a multiple of 10 (e.g., bouncing between 9 and 10), a warning will be logged on almost every task submission. This can degrade performance due to logging overhead and clutter the application logs.

Recommendation:

Instead of a simple modulo check on every submission, consider using a rate-limiting mechanism for this warning (e.g., logging at most once every few seconds/minutes), or only log when the queue size crosses specific high-watermark thresholds (and log an info/debug message when it goes back down).

Comment on lines +961 to +965
if (this.executorService != null) {
this.executorService.shutdown();
this.executorService.awaitTermination(10, TimeUnit.SECONDS);
this.executorService.shutdownNow();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If awaitTermination is interrupted, it throws an InterruptedException which will bypass the subsequent shutdownNow() call. This can leave running tasks active and cause thread/resource leaks.

Recommendation:

Use the standard robust shutdown pattern to ensure shutdownNow() is always called even if the wait is interrupted, and properly restore the interrupted status.

      if (this.executorService != null) {
        this.executorService.shutdown();
        try {
          if (!this.executorService.awaitTermination(10, TimeUnit.SECONDS)) {
            this.executorService.shutdownNow();
          }
        } catch (InterruptedException e) {
          this.executorService.shutdownNow();
          Thread.currentThread().interrupt();
        }
      }
References
  1. In Java, do not swallow InterruptedException. When catching it, restore the thread's interrupted status by calling Thread.currentThread().interrupt() and handle the interruption appropriately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant