Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.

Commit 9993717

Browse files
fix: Flaky connenction close issue (#2044)
* Added EoS mark in populate buffer. changed log level to Fine. Minor refactor * Updated count assertion @ testConnectionClose * Updated condition to trigger `connection.close` at testConnectionClose * Added and wired flagEndOfStream. Refactored and improved Thread interrupt logic * Add testConnectionClose for checking connection close while using Read API * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 251d468 commit 9993717

3 files changed

Lines changed: 142 additions & 84 deletions

File tree

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java

Lines changed: 115 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,11 @@ class ConnectionImpl implements Connection {
8787
Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT);
8888
private final Logger logger = Logger.getLogger(this.getClass().getName());
8989
private BigQueryReadClient bqReadClient;
90-
private static final long EXECUTOR_TIMEOUT_SEC = 5;
90+
private static final long EXECUTOR_TIMEOUT_SEC = 10;
91+
private BlockingQueue<AbstractList<FieldValue>>
92+
bufferFvl; // initialized lazily iff we end up using the tabledata.list end point
93+
private BlockingQueue<BigQueryResultImpl.Row>
94+
bufferRow; // initialized lazily iff we end up using Read API
9195

9296
ConnectionImpl(
9397
ConnectionSettings connectionSettings,
@@ -107,6 +111,19 @@ class ConnectionImpl implements Connection {
107111
: Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
108112
}
109113

114+
/**
115+
* This method returns the number of records to be stored in the buffer and it ensures that it is
116+
* between a reasonable range
117+
*
118+
* @return The max number of records to be stored in the buffer
119+
*/
120+
private int getBufferSize() {
121+
return (connectionSettings == null
122+
|| connectionSettings.getNumBufferedRows() == null
123+
|| connectionSettings.getNumBufferedRows() < 10000
124+
? 20000
125+
: Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
126+
}
110127
/**
111128
* Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt.
112129
* The pageFetcher threat will not request for any subsequent threads after interrupting and
@@ -119,12 +136,14 @@ class ConnectionImpl implements Connection {
119136
@BetaApi
120137
@Override
121138
public synchronized boolean close() throws BigQuerySQLException {
139+
flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops
140+
// advancing the cursor
122141
queryTaskExecutor.shutdownNow();
123142
try {
124-
queryTaskExecutor.awaitTermination(
125-
EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS); // wait for the executor shutdown
143+
if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) {
144+
return true;
145+
} // else queryTaskExecutor.isShutdown() will be returned outside this try block
126146
} catch (InterruptedException e) {
127-
e.printStackTrace();
128147
logger.log(
129148
Level.WARNING,
130149
"\n" + Thread.currentThread().getName() + " Exception while awaitTermination",
@@ -330,7 +349,7 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
330349
BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats(jobId);
331350

332351
// Keeps the deserialized records at the row level, which is consumed by BigQueryResult
333-
BlockingQueue<AbstractList<FieldValue>> buffer = new LinkedBlockingDeque<>(bufferSize);
352+
bufferFvl = new LinkedBlockingDeque<>(getBufferSize());
334353

335354
// Keeps the parsed FieldValueLists
336355
BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
@@ -352,11 +371,11 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
352371
// throughput
353372

354373
populateBufferAsync(
355-
rpcResponseQueue, pageCache, buffer); // spawns a thread to populate the buffer
374+
rpcResponseQueue, pageCache, bufferFvl); // spawns a thread to populate the buffer
356375

357376
// This will work for pagination as well, as buffer is getting updated asynchronously
358377
return new BigQueryResultImpl<AbstractList<FieldValue>>(
359-
schema, numRows, buffer, bigQueryResultStats);
378+
schema, numRows, bufferFvl, bigQueryResultStats);
360379
}
361380

362381
@VisibleForTesting
@@ -384,7 +403,7 @@ BigQueryResult processQueryResponseResults(
384403
BigQueryResultStats bigQueryResultStats =
385404
new BigQueryResultStatsImpl(queryStatistics, sessionInfo);
386405

387-
BlockingQueue<AbstractList<FieldValue>> buffer = new LinkedBlockingDeque<>(bufferSize);
406+
bufferFvl = new LinkedBlockingDeque<>(getBufferSize());
388407
BlockingQueue<Tuple<Iterable<FieldValueList>, Boolean>> pageCache =
389408
new LinkedBlockingDeque<>(
390409
getPageCacheSize(connectionSettings.getNumBufferedRows(), schema));
@@ -401,10 +420,10 @@ BigQueryResult processQueryResponseResults(
401420
parseRpcDataAsync(results.getRows(), schema, pageCache, rpcResponseQueue);
402421

403422
// Thread to populate the buffer (a blocking queue) shared with the consumer
404-
populateBufferAsync(rpcResponseQueue, pageCache, buffer);
423+
populateBufferAsync(rpcResponseQueue, pageCache, bufferFvl);
405424

406425
return new BigQueryResultImpl<AbstractList<FieldValue>>(
407-
schema, numRows, buffer, bigQueryResultStats);
426+
schema, numRows, bufferFvl, bigQueryResultStats);
408427
}
409428

410429
@VisibleForTesting
@@ -420,6 +439,11 @@ void runNextPageTaskAsync(
420439
while (pageToken != null) { // paginate for non null token
421440
if (Thread.currentThread().isInterrupted()
422441
|| queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown
442+
logger.log(
443+
Level.WARNING,
444+
"\n"
445+
+ Thread.currentThread().getName()
446+
+ " Interrupted @ runNextPageTaskAsync");
423447
break;
424448
}
425449
TableDataList tabledataList = tableDataListRpc(destinationTable, pageToken);
@@ -432,12 +456,12 @@ void runNextPageTaskAsync(
432456
}
433457
rpcResponseQueue.put(
434458
Tuple.of(
435-
null,
436-
false)); // this will stop the parseDataTask as well in case of interrupt or
437-
// when the pagination completes
459+
null, false)); // this will stop the parseDataTask as well when the pagination
460+
// completes
438461
} catch (Exception e) {
439462
throw new BigQueryException(0, e.getMessage(), e);
440-
}
463+
} // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
464+
// have finished processing the records and even that will be interrupted
441465
};
442466
queryTaskExecutor.execute(nextPageTask);
443467
}
@@ -460,7 +484,9 @@ void parseRpcDataAsync(
460484
pageCache.put(
461485
Tuple.of(firstFieldValueLists, true)); // this is the first page which we have received.
462486
} catch (InterruptedException e) {
463-
throw new BigQueryException(0, e.getMessage(), e);
487+
logger.log(
488+
Level.WARNING,
489+
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync");
464490
}
465491

466492
// rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use
@@ -470,6 +496,14 @@ void parseRpcDataAsync(
470496
try {
471497
boolean hasMorePages = true;
472498
while (hasMorePages) {
499+
if (Thread.currentThread().isInterrupted()
500+
|| queryTaskExecutor.isShutdown()) { // do not process further data and shutdown
501+
logger.log(
502+
Level.WARNING,
503+
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync");
504+
break;
505+
}
506+
// no interrupt received till this point, continue processing
473507
Tuple<TableDataList, Boolean> rpcResponse = rpcResponseQueue.take();
474508
TableDataList tabledataList = rpcResponse.x();
475509
hasMorePages = rpcResponse.y();
@@ -482,55 +516,24 @@ void parseRpcDataAsync(
482516
} catch (InterruptedException e) {
483517
logger.log(
484518
Level.WARNING,
485-
"\n" + Thread.currentThread().getName() + " Interrupted",
519+
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync",
486520
e); // Thread might get interrupted while calling the Cancel method, which is
487521
// expected, so logging this instead of throwing the exception back
488522
}
489523
try {
490-
pageCache.put(Tuple.of(null, false)); // no further pages
524+
pageCache.put(Tuple.of(null, false)); // no further pages, graceful exit scenario
491525
} catch (InterruptedException e) {
492526
logger.log(
493527
Level.WARNING,
494-
"\n" + Thread.currentThread().getName() + " Interrupted",
528+
"\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync",
495529
e); // Thread might get interrupted while calling the Cancel method, which is
496530
// expected, so logging this instead of throwing the exception back
497-
}
531+
} // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
532+
// have finished processing the records and even that will be interrupted
498533
};
499534
queryTaskExecutor.execute(parseDataTask);
500535
}
501536

502-
/**
503-
* This method is called when the current thread is interrupted, this communicates to ResultSet by
504-
* adding a EoS
505-
*
506-
* @param buffer
507-
*/
508-
@InternalApi
509-
void markEoS(BlockingQueue<AbstractList<FieldValue>> buffer) { // package-private
510-
try {
511-
buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker
512-
} catch (InterruptedException e) {
513-
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
514-
}
515-
}
516-
517-
/**
518-
* This method is called when the current thread is interrupted, this communicates to ResultSet by
519-
* adding a isLast Row
520-
*
521-
* @param buffer
522-
*/
523-
@InternalApi
524-
void markLast(BlockingQueue<BigQueryResultImpl.Row> buffer) { // package-private
525-
try {
526-
buffer.put(
527-
new BigQueryResultImpl.Row(
528-
null, true)); // All the pages has been processed, put this marker
529-
} catch (InterruptedException e) {
530-
logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
531-
}
532-
}
533-
534537
@VisibleForTesting
535538
void populateBufferAsync(
536539
BlockingQueue<Tuple<TableDataList, Boolean>> rpcResponseQueue,
@@ -551,25 +554,21 @@ void populateBufferAsync(
551554
"\n" + Thread.currentThread().getName() + " Interrupted",
552555
e); // Thread might get interrupted while calling the Cancel method, which is
553556
// expected, so logging this instead of throwing the exception back
554-
markEoS(
555-
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
557+
break;
556558
}
557559

558560
if (Thread.currentThread().isInterrupted()
561+
|| queryTaskExecutor.isShutdown()
559562
|| fieldValueLists
560563
== null) { // do not process further pages and shutdown (outerloop)
561-
markEoS(
562-
buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
563564
break;
564565
}
565566

566567
for (FieldValueList fieldValueList : fieldValueLists) {
567568
try {
568-
if (Thread.currentThread()
569-
.isInterrupted()) { // do not process further pages and shutdown (inner loop)
570-
markEoS(
571-
buffer); // Thread has been interrupted, communicate to ResultSet by adding
572-
// EoS
569+
if (Thread.currentThread().isInterrupted()
570+
|| queryTaskExecutor
571+
.isShutdown()) { // do not process further pages and shutdown (inner loop)
573572
break;
574573
}
575574
buffer.put(fieldValueList);
@@ -578,24 +577,55 @@ void populateBufferAsync(
578577
}
579578
}
580579
}
581-
582580
try {
583-
if (Thread.currentThread()
584-
.isInterrupted()) { // clear the buffer for any outstanding records
585-
rpcResponseQueue
586-
.clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
587-
// could trigger
588-
buffer.clear();
589-
}
590-
markEoS(buffer); // All the pages has been processed, put this marker
581+
buffer.put(
582+
new EndOfFieldValueList()); // All the pages has been processed, put this marker
583+
} catch (InterruptedException e) {
584+
logger.log(
585+
Level.WARNING,
586+
"\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync",
587+
e);
591588
} finally {
592-
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
589+
queryTaskExecutor
590+
.shutdownNow(); // Shutdown the thread pool. All the records are now processed
593591
}
594592
};
595593

596594
queryTaskExecutor.execute(populateBufferRunnable);
597595
}
598596

597+
/**
598+
* In an interrupt scenario, like when the background threads are still working and the user calls
599+
* `connection.close() then we need to add an End of Stream flag in the buffer so that the
600+
* `ResultSet.next()` stops advancing the cursor. We cannot rely on the `populateBufferAsync`
601+
* method to do this as the `BlockingQueue.put()` call will error out after the interrupt is
602+
* triggerred
603+
*/
604+
@InternalApi
605+
void flagEndOfStream() { // package-private
606+
try {
607+
if (bufferFvl != null) { // that is tabledata.list endpoint is used
608+
bufferFvl.put(
609+
new EndOfFieldValueList()); // All the pages has been processed, put this marker
610+
} else if (bufferRow != null) {
611+
bufferRow.put(
612+
new BigQueryResultImpl.Row(
613+
null, true)); // All the pages has been processed, put this marker
614+
} else {
615+
logger.log(
616+
Level.WARNING,
617+
"\n"
618+
+ Thread.currentThread().getName()
619+
+ " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query");
620+
}
621+
} catch (InterruptedException e) {
622+
logger.log(
623+
Level.WARNING,
624+
"\n" + Thread.currentThread().getName() + " Interrupted @ flagEndOfStream",
625+
e);
626+
}
627+
}
628+
599629
/* Helper method that parse and populate a page with TableRows */
600630
private static Iterable<FieldValueList> getIterableFieldValueList(
601631
Iterable<TableRow> tableDataPb, final Schema schema) {
@@ -783,17 +813,17 @@ BigQueryResult highThroughPutRead(
783813
;
784814

785815
ReadSession readSession = bqReadClient.createReadSession(builder.build());
786-
BlockingQueue<BigQueryResultImpl.Row> buffer = new LinkedBlockingDeque<>(bufferSize);
816+
bufferRow = new LinkedBlockingDeque<>(getBufferSize());
787817
Map<String, Integer> arrowNameToIndex = new HashMap<>();
788818
// deserialize and populate the buffer async, so that the client isn't blocked
789819
processArrowStreamAsync(
790820
readSession,
791-
buffer,
821+
bufferRow,
792822
new ArrowRowReader(readSession.getArrowSchema(), arrowNameToIndex),
793823
schema);
794824

795825
logger.log(Level.INFO, "\n Using BigQuery Read API");
796-
return new BigQueryResultImpl<BigQueryResultImpl.Row>(schema, totalRows, buffer, stats);
826+
return new BigQueryResultImpl<BigQueryResultImpl.Row>(schema, totalRows, bufferRow, stats);
797827

798828
} catch (IOException e) {
799829
throw BigQueryException.translateAndThrow(e);
@@ -827,8 +857,18 @@ private void processArrowStreamAsync(
827857

828858
} catch (Exception e) {
829859
throw BigQueryException.translateAndThrow(e);
830-
} finally {
831-
markLast(buffer); // marking end of stream
860+
} finally { // logic needed for graceful shutdown
861+
// marking end of stream
862+
try {
863+
buffer.put(
864+
new BigQueryResultImpl.Row(
865+
null, true)); // All the pages has been processed, put this marker
866+
} catch (InterruptedException e) {
867+
logger.log(
868+
Level.WARNING,
869+
"\n" + Thread.currentThread().getName() + " Interrupted @ markLast",
870+
e);
871+
}
832872
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
833873
}
834874
};
@@ -890,7 +930,6 @@ private void processRows(
890930

891931
if (Thread.currentThread().isInterrupted()
892932
|| queryTaskExecutor.isShutdown()) { // do not process and shutdown
893-
markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process
894933
break; // exit the loop, root will be cleared in the finally block
895934
}
896935

@@ -981,9 +1020,6 @@ boolean isFastQuerySupported() {
9811020

9821021
@VisibleForTesting
9831022
boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQueryParameters) {
984-
985-
// TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job
986-
// is not complete
9871023
if ((totalRows == null || pageRows == null)
9881024
&& Boolean.TRUE.equals(
9891025
connectionSettings
@@ -992,7 +1028,6 @@ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQuer
9921028
return true;
9931029
}
9941030

995-
// Schema schema = Schema.fromPb(tableSchema);
9961031
// Read API does not yet support Interval Type or QueryParameters
9971032
if (containsIntervalType(schema) || hasQueryParameters) {
9981033
logger.log(Level.INFO, "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI");

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2652,12 +2652,11 @@ public void testConnectionClose() throws SQLException {
26522652
int cnt = 0;
26532653
while (rs.next()) {
26542654
++cnt;
2655-
if (cnt > 57000) { // breaking at 57K, query reads 300K
2655+
if (cnt == 57000) { // breaking at 57000th record, query reads 300K
26562656
assertTrue(connection.close()); // we should be able to cancel the connection
26572657
}
26582658
}
2659-
assertTrue(
2660-
cnt < 60000); // Few extra records are still read (generally ~10) even after canceling, as
2659+
assertTrue(cnt < 100000); // Extra records are still read even after canceling, as
26612660
// the backgrounds threads are still active while the interrupt occurs and the
26622661
// buffer and pageCache are cleared
26632662
}

0 commit comments

Comments
 (0)