diff --git a/CHANGELOG.md b/CHANGELOG.md
index 748304184f..28a298be54 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,13 @@
# Changelog
+### [2.11.2](https://github.com/googleapis/java-bigquery/compare/v2.11.1...v2.11.2) (2022-05-18)
+
+
+### Bug Fixes
+
+* Flaky connenction close issue ([#2044](https://github.com/googleapis/java-bigquery/issues/2044)) ([9993717](https://github.com/googleapis/java-bigquery/commit/9993717d546c4039cb8c846787fdd131cc0c113f))
+* NPE issue with testMultipleRuns ([#2050](https://github.com/googleapis/java-bigquery/issues/2050)) ([251d468](https://github.com/googleapis/java-bigquery/commit/251d4686d22e0000982bcd891de68491326558fe))
+
### [2.11.1](https://github.com/googleapis/java-bigquery/compare/v2.11.0...v2.11.1) (2022-05-16)
diff --git a/README.md b/README.md
index 34a0ae2286..ef43a359ad 100644
--- a/README.md
+++ b/README.md
@@ -44,7 +44,7 @@ If you are using Maven without BOM, add this to your dependencies:
com.google.cloud
google-cloud-bigquery
- 2.11.0
+ 2.11.1
```
@@ -59,13 +59,13 @@ implementation 'com.google.cloud:google-cloud-bigquery'
If you are using Gradle without BOM, add this to your dependencies
```Groovy
-implementation 'com.google.cloud:google-cloud-bigquery:2.11.0'
+implementation 'com.google.cloud:google-cloud-bigquery:2.11.1'
```
If you are using SBT, add this to your dependencies
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.11.0"
+libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.11.1"
```
## Authentication
diff --git a/benchmark/pom.xml b/benchmark/pom.xml
index f3ac092bbe..186d89a51e 100644
--- a/benchmark/pom.xml
+++ b/benchmark/pom.xml
@@ -6,7 +6,7 @@
google-cloud-bigquery-parent
com.google.cloud
- 2.11.1
+ 2.11.2
diff --git a/google-cloud-bigquery/pom.xml b/google-cloud-bigquery/pom.xml
index c110be2115..049beb2249 100644
--- a/google-cloud-bigquery/pom.xml
+++ b/google-cloud-bigquery/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.google.cloud
google-cloud-bigquery
- 2.11.1
+ 2.11.2
jar
BigQuery
https://github.com/googleapis/java-bigquery
@@ -11,7 +11,7 @@
com.google.cloud
google-cloud-bigquery-parent
- 2.11.1
+ 2.11.2
google-cloud-bigquery
diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java
index b43615141c..be5174b260 100644
--- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java
+++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java
@@ -87,7 +87,11 @@ class ConnectionImpl implements Connection {
Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT);
private final Logger logger = Logger.getLogger(this.getClass().getName());
private BigQueryReadClient bqReadClient;
- private static final long EXECUTOR_TIMEOUT_SEC = 5;
+ private static final long EXECUTOR_TIMEOUT_SEC = 10;
+ private BlockingQueue>
+ bufferFvl; // initialized lazily iff we end up using the tabledata.list end point
+ private BlockingQueue
+ bufferRow; // initialized lazily iff we end up using Read API
ConnectionImpl(
ConnectionSettings connectionSettings,
@@ -107,6 +111,19 @@ class ConnectionImpl implements Connection {
: Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
}
+ /**
+ * This method returns the number of records to be stored in the buffer and it ensures that it is
+ * between a reasonable range
+ *
+ * @return The max number of records to be stored in the buffer
+ */
+ private int getBufferSize() {
+ return (connectionSettings == null
+ || connectionSettings.getNumBufferedRows() == null
+ || connectionSettings.getNumBufferedRows() < 10000
+ ? 20000
+ : Math.min(connectionSettings.getNumBufferedRows() * 2, 100000));
+ }
/**
* Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt.
* The pageFetcher threat will not request for any subsequent threads after interrupting and
@@ -119,12 +136,14 @@ class ConnectionImpl implements Connection {
@BetaApi
@Override
public synchronized boolean close() throws BigQuerySQLException {
+ flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops
+ // advancing the cursor
queryTaskExecutor.shutdownNow();
try {
- queryTaskExecutor.awaitTermination(
- EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS); // wait for the executor shutdown
+ if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) {
+ return true;
+ } // else queryTaskExecutor.isShutdown() will be returned outside this try block
} catch (InterruptedException e) {
- e.printStackTrace();
logger.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Exception while awaitTermination",
@@ -234,8 +253,10 @@ public BigQueryResult executeSelect(
BigQueryResult getResultSet(
GetQueryResultsResponse firstPage, JobId jobId, String sql, Boolean hasQueryParameters) {
if (firstPage.getJobComplete()
- && firstPage.getTotalRows()
- != null) { // firstPage.getTotalRows() is null if job is not complete
+ && firstPage.getTotalRows() != null
+ && firstPage.getSchema()
+ != null) { // firstPage.getTotalRows() is null if job is not complete. We need to make
+ // sure that the schema is not null, as it is required for the ResultSet
return getSubsequentQueryResultsWithJob(
firstPage.getTotalRows().longValue(),
(long) firstPage.getRows().size(),
@@ -328,7 +349,7 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats(jobId);
// Keeps the deserialized records at the row level, which is consumed by BigQueryResult
- BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize);
+ bufferFvl = new LinkedBlockingDeque<>(getBufferSize());
// Keeps the parsed FieldValueLists
BlockingQueue, Boolean>> pageCache =
@@ -350,11 +371,11 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
// throughput
populateBufferAsync(
- rpcResponseQueue, pageCache, buffer); // spawns a thread to populate the buffer
+ rpcResponseQueue, pageCache, bufferFvl); // spawns a thread to populate the buffer
// This will work for pagination as well, as buffer is getting updated asynchronously
return new BigQueryResultImpl>(
- schema, numRows, buffer, bigQueryResultStats);
+ schema, numRows, bufferFvl, bigQueryResultStats);
}
@VisibleForTesting
@@ -382,7 +403,7 @@ BigQueryResult processQueryResponseResults(
BigQueryResultStats bigQueryResultStats =
new BigQueryResultStatsImpl(queryStatistics, sessionInfo);
- BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize);
+ bufferFvl = new LinkedBlockingDeque<>(getBufferSize());
BlockingQueue, Boolean>> pageCache =
new LinkedBlockingDeque<>(
getPageCacheSize(connectionSettings.getNumBufferedRows(), schema));
@@ -399,10 +420,10 @@ BigQueryResult processQueryResponseResults(
parseRpcDataAsync(results.getRows(), schema, pageCache, rpcResponseQueue);
// Thread to populate the buffer (a blocking queue) shared with the consumer
- populateBufferAsync(rpcResponseQueue, pageCache, buffer);
+ populateBufferAsync(rpcResponseQueue, pageCache, bufferFvl);
return new BigQueryResultImpl>(
- schema, numRows, buffer, bigQueryResultStats);
+ schema, numRows, bufferFvl, bigQueryResultStats);
}
@VisibleForTesting
@@ -418,6 +439,11 @@ void runNextPageTaskAsync(
while (pageToken != null) { // paginate for non null token
if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown
+ logger.log(
+ Level.WARNING,
+ "\n"
+ + Thread.currentThread().getName()
+ + " Interrupted @ runNextPageTaskAsync");
break;
}
TableDataList tabledataList = tableDataListRpc(destinationTable, pageToken);
@@ -430,12 +456,12 @@ void runNextPageTaskAsync(
}
rpcResponseQueue.put(
Tuple.of(
- null,
- false)); // this will stop the parseDataTask as well in case of interrupt or
- // when the pagination completes
+ null, false)); // this will stop the parseDataTask as well when the pagination
+ // completes
} catch (Exception e) {
throw new BigQueryException(0, e.getMessage(), e);
- }
+ } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
+ // have finished processing the records and even that will be interrupted
};
queryTaskExecutor.execute(nextPageTask);
}
@@ -458,7 +484,9 @@ void parseRpcDataAsync(
pageCache.put(
Tuple.of(firstFieldValueLists, true)); // this is the first page which we have received.
} catch (InterruptedException e) {
- throw new BigQueryException(0, e.getMessage(), e);
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync");
}
// rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use
@@ -468,6 +496,14 @@ void parseRpcDataAsync(
try {
boolean hasMorePages = true;
while (hasMorePages) {
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()) { // do not process further data and shutdown
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync");
+ break;
+ }
+ // no interrupt received till this point, continue processing
Tuple rpcResponse = rpcResponseQueue.take();
TableDataList tabledataList = rpcResponse.x();
hasMorePages = rpcResponse.y();
@@ -480,55 +516,24 @@ void parseRpcDataAsync(
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
- "\n" + Thread.currentThread().getName() + " Interrupted",
+ "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
}
try {
- pageCache.put(Tuple.of(null, false)); // no further pages
+ pageCache.put(Tuple.of(null, false)); // no further pages, graceful exit scenario
} catch (InterruptedException e) {
logger.log(
Level.WARNING,
- "\n" + Thread.currentThread().getName() + " Interrupted",
+ "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
- }
+ } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
+ // have finished processing the records and even that will be interrupted
};
queryTaskExecutor.execute(parseDataTask);
}
- /**
- * This method is called when the current thread is interrupted, this communicates to ResultSet by
- * adding a EoS
- *
- * @param buffer
- */
- @InternalApi
- void markEoS(BlockingQueue> buffer) { // package-private
- try {
- buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker
- } catch (InterruptedException e) {
- logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
- }
- }
-
- /**
- * This method is called when the current thread is interrupted, this communicates to ResultSet by
- * adding a isLast Row
- *
- * @param buffer
- */
- @InternalApi
- void markLast(BlockingQueue buffer) { // package-private
- try {
- buffer.put(
- new BigQueryResultImpl.Row(
- null, true)); // All the pages has been processed, put this marker
- } catch (InterruptedException e) {
- logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e);
- }
- }
-
@VisibleForTesting
void populateBufferAsync(
BlockingQueue> rpcResponseQueue,
@@ -549,25 +554,21 @@ void populateBufferAsync(
"\n" + Thread.currentThread().getName() + " Interrupted",
e); // Thread might get interrupted while calling the Cancel method, which is
// expected, so logging this instead of throwing the exception back
- markEoS(
- buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
+ break;
}
if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor.isShutdown()
|| fieldValueLists
== null) { // do not process further pages and shutdown (outerloop)
- markEoS(
- buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS
break;
}
for (FieldValueList fieldValueList : fieldValueLists) {
try {
- if (Thread.currentThread()
- .isInterrupted()) { // do not process further pages and shutdown (inner loop)
- markEoS(
- buffer); // Thread has been interrupted, communicate to ResultSet by adding
- // EoS
+ if (Thread.currentThread().isInterrupted()
+ || queryTaskExecutor
+ .isShutdown()) { // do not process further pages and shutdown (inner loop)
break;
}
buffer.put(fieldValueList);
@@ -576,24 +577,55 @@ void populateBufferAsync(
}
}
}
-
try {
- if (Thread.currentThread()
- .isInterrupted()) { // clear the buffer for any outstanding records
- rpcResponseQueue
- .clear(); // IMP - so that if it's full then it unblocks and the interrupt logic
- // could trigger
- buffer.clear();
- }
- markEoS(buffer); // All the pages has been processed, put this marker
+ buffer.put(
+ new EndOfFieldValueList()); // All the pages has been processed, put this marker
+ } catch (InterruptedException e) {
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync",
+ e);
} finally {
- queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
+ queryTaskExecutor
+ .shutdownNow(); // Shutdown the thread pool. All the records are now processed
}
};
queryTaskExecutor.execute(populateBufferRunnable);
}
+ /**
+ * In an interrupt scenario, like when the background threads are still working and the user calls
+ * `connection.close() then we need to add an End of Stream flag in the buffer so that the
+ * `ResultSet.next()` stops advancing the cursor. We cannot rely on the `populateBufferAsync`
+ * method to do this as the `BlockingQueue.put()` call will error out after the interrupt is
+ * triggerred
+ */
+ @InternalApi
+ void flagEndOfStream() { // package-private
+ try {
+ if (bufferFvl != null) { // that is tabledata.list endpoint is used
+ bufferFvl.put(
+ new EndOfFieldValueList()); // All the pages has been processed, put this marker
+ } else if (bufferRow != null) {
+ bufferRow.put(
+ new BigQueryResultImpl.Row(
+ null, true)); // All the pages has been processed, put this marker
+ } else {
+ logger.log(
+ Level.WARNING,
+ "\n"
+ + Thread.currentThread().getName()
+ + " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query");
+ }
+ } catch (InterruptedException e) {
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted @ flagEndOfStream",
+ e);
+ }
+ }
+
/* Helper method that parse and populate a page with TableRows */
private static Iterable getIterableFieldValueList(
Iterable tableDataPb, final Schema schema) {
@@ -781,17 +813,17 @@ BigQueryResult highThroughPutRead(
;
ReadSession readSession = bqReadClient.createReadSession(builder.build());
- BlockingQueue buffer = new LinkedBlockingDeque<>(bufferSize);
+ bufferRow = new LinkedBlockingDeque<>(getBufferSize());
Map arrowNameToIndex = new HashMap<>();
// deserialize and populate the buffer async, so that the client isn't blocked
processArrowStreamAsync(
readSession,
- buffer,
+ bufferRow,
new ArrowRowReader(readSession.getArrowSchema(), arrowNameToIndex),
schema);
logger.log(Level.INFO, "\n Using BigQuery Read API");
- return new BigQueryResultImpl(schema, totalRows, buffer, stats);
+ return new BigQueryResultImpl(schema, totalRows, bufferRow, stats);
} catch (IOException e) {
throw BigQueryException.translateAndThrow(e);
@@ -825,8 +857,18 @@ private void processArrowStreamAsync(
} catch (Exception e) {
throw BigQueryException.translateAndThrow(e);
- } finally {
- markLast(buffer); // marking end of stream
+ } finally { // logic needed for graceful shutdown
+ // marking end of stream
+ try {
+ buffer.put(
+ new BigQueryResultImpl.Row(
+ null, true)); // All the pages has been processed, put this marker
+ } catch (InterruptedException e) {
+ logger.log(
+ Level.WARNING,
+ "\n" + Thread.currentThread().getName() + " Interrupted @ markLast",
+ e);
+ }
queryTaskExecutor.shutdownNow(); // Shutdown the thread pool
}
};
@@ -888,7 +930,6 @@ private void processRows(
if (Thread.currentThread().isInterrupted()
|| queryTaskExecutor.isShutdown()) { // do not process and shutdown
- markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process
break; // exit the loop, root will be cleared in the finally block
}
@@ -979,9 +1020,6 @@ boolean isFastQuerySupported() {
@VisibleForTesting
boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQueryParameters) {
-
- // TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job
- // is not complete
if ((totalRows == null || pageRows == null)
&& Boolean.TRUE.equals(
connectionSettings
@@ -990,7 +1028,6 @@ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQuer
return true;
}
- // Schema schema = Schema.fromPb(tableSchema);
// Read API does not yet support Interval Type or QueryParameters
if (containsIntervalType(schema) || hasQueryParameters) {
logger.log(Level.INFO, "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI");
diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java
index 348749b467..dcfa5265f1 100644
--- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java
+++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java
@@ -2652,12 +2652,11 @@ public void testConnectionClose() throws SQLException {
int cnt = 0;
while (rs.next()) {
++cnt;
- if (cnt > 57000) { // breaking at 57K, query reads 300K
+ if (cnt == 57000) { // breaking at 57000th record, query reads 300K
assertTrue(connection.close()); // we should be able to cancel the connection
}
}
- assertTrue(
- cnt < 60000); // Few extra records are still read (generally ~10) even after canceling, as
+ assertTrue(cnt < 100000); // Extra records are still read even after canceling, as
// the backgrounds threads are still active while the interrupt occurs and the
// buffer and pageCache are cleared
}
diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java
index d672967b14..deabba59f8 100644
--- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java
+++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java
@@ -332,7 +332,31 @@ public void testIterateAndOrderDefaultConnSettings() throws SQLException {
++cnt;
}
assertEquals(LIMIT_RECS, cnt); // all the records were retrieved
- connection.close();
+ assertTrue(connection.close());
+ }
+
+ /*
+ This tests interrupts the execution in between and checks if it has been interrupted successfully while using ReadAPI
+ */
+ @Test
+ public void testConnectionClose() throws SQLException {
+ Connection connection = bigquery.createConnection();
+ BigQueryResult bigQueryResult = connection.executeSelect(QUERY);
+ logger.log(Level.INFO, "Query used: {0}", QUERY);
+ ResultSet rs = bigQueryResult.getResultSet();
+ int cnt = 0;
+ while (rs.next()) {
+ ++cnt;
+ if (cnt == 50000) { // interrupt at 50K
+ assertTrue(connection.close());
+ }
+ }
+ assertTrue(
+ LIMIT_RECS
+ > cnt); // we stopped at 50K but still we can expect additional records (typically ~100)
+ // to be retrieved
+ // as a number of records should have been already buffered. less than
+ // LIMIT_RECS should be retrieved
}
@Test
diff --git a/pom.xml b/pom.xml
index 32a0419375..bae36ea406 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-bigquery-parent
pom
- 2.11.1
+ 2.11.2
BigQuery Parent
https://github.com/googleapis/java-bigquery
@@ -118,7 +118,7 @@
com.google.cloud
google-cloud-bigquery
- 2.11.1
+ 2.11.2
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index 3ce7cebc4f..906f422e00 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -45,7 +45,7 @@
com.google.cloud
google-cloud-bigquery
- 2.11.0
+ 2.11.1
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index 3715d2b05f..0c35979670 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -44,7 +44,7 @@
com.google.cloud
google-cloud-bigquery
- 2.11.1
+ 2.11.2
diff --git a/versions.txt b/versions.txt
index e5caec1bb2..503185ac76 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,4 +1,4 @@
# Format:
# module:released-version:current-version
-google-cloud-bigquery:2.11.1:2.11.1
\ No newline at end of file
+google-cloud-bigquery:2.11.2:2.11.2
\ No newline at end of file