diff --git a/CHANGELOG.md b/CHANGELOG.md index 748304184f..28a298be54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +### [2.11.2](https://github.com/googleapis/java-bigquery/compare/v2.11.1...v2.11.2) (2022-05-18) + + +### Bug Fixes + +* Flaky connenction close issue ([#2044](https://github.com/googleapis/java-bigquery/issues/2044)) ([9993717](https://github.com/googleapis/java-bigquery/commit/9993717d546c4039cb8c846787fdd131cc0c113f)) +* NPE issue with testMultipleRuns ([#2050](https://github.com/googleapis/java-bigquery/issues/2050)) ([251d468](https://github.com/googleapis/java-bigquery/commit/251d4686d22e0000982bcd891de68491326558fe)) + ### [2.11.1](https://github.com/googleapis/java-bigquery/compare/v2.11.0...v2.11.1) (2022-05-16) diff --git a/README.md b/README.md index 34a0ae2286..ef43a359ad 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ If you are using Maven without BOM, add this to your dependencies: com.google.cloud google-cloud-bigquery - 2.11.0 + 2.11.1 ``` @@ -59,13 +59,13 @@ implementation 'com.google.cloud:google-cloud-bigquery' If you are using Gradle without BOM, add this to your dependencies ```Groovy -implementation 'com.google.cloud:google-cloud-bigquery:2.11.0' +implementation 'com.google.cloud:google-cloud-bigquery:2.11.1' ``` If you are using SBT, add this to your dependencies ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.11.0" +libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.11.1" ``` ## Authentication diff --git a/benchmark/pom.xml b/benchmark/pom.xml index f3ac092bbe..186d89a51e 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -6,7 +6,7 @@ google-cloud-bigquery-parent com.google.cloud - 2.11.1 + 2.11.2 diff --git a/google-cloud-bigquery/pom.xml b/google-cloud-bigquery/pom.xml index c110be2115..049beb2249 100644 --- a/google-cloud-bigquery/pom.xml +++ b/google-cloud-bigquery/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.google.cloud google-cloud-bigquery - 2.11.1 + 2.11.2 jar BigQuery https://github.com/googleapis/java-bigquery @@ -11,7 +11,7 @@ com.google.cloud google-cloud-bigquery-parent - 2.11.1 + 2.11.2 google-cloud-bigquery diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java index b43615141c..be5174b260 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/ConnectionImpl.java @@ -87,7 +87,11 @@ class ConnectionImpl implements Connection { Executors.newFixedThreadPool(MAX_PROCESS_QUERY_THREADS_CNT); private final Logger logger = Logger.getLogger(this.getClass().getName()); private BigQueryReadClient bqReadClient; - private static final long EXECUTOR_TIMEOUT_SEC = 5; + private static final long EXECUTOR_TIMEOUT_SEC = 10; + private BlockingQueue> + bufferFvl; // initialized lazily iff we end up using the tabledata.list end point + private BlockingQueue + bufferRow; // initialized lazily iff we end up using Read API ConnectionImpl( ConnectionSettings connectionSettings, @@ -107,6 +111,19 @@ class ConnectionImpl implements Connection { : Math.min(connectionSettings.getNumBufferedRows() * 2, 100000)); } + /** + * This method returns the number of records to be stored in the buffer and it ensures that it is + * between a reasonable range + * + * @return The max number of records to be stored in the buffer + */ + private int getBufferSize() { + return (connectionSettings == null + || connectionSettings.getNumBufferedRows() == null + || connectionSettings.getNumBufferedRows() < 10000 + ? 20000 + : Math.min(connectionSettings.getNumBufferedRows() * 2, 100000)); + } /** * Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt. * The pageFetcher threat will not request for any subsequent threads after interrupting and @@ -119,12 +136,14 @@ class ConnectionImpl implements Connection { @BetaApi @Override public synchronized boolean close() throws BigQuerySQLException { + flagEndOfStream(); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops + // advancing the cursor queryTaskExecutor.shutdownNow(); try { - queryTaskExecutor.awaitTermination( - EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS); // wait for the executor shutdown + if (queryTaskExecutor.awaitTermination(EXECUTOR_TIMEOUT_SEC, TimeUnit.SECONDS)) { + return true; + } // else queryTaskExecutor.isShutdown() will be returned outside this try block } catch (InterruptedException e) { - e.printStackTrace(); logger.log( Level.WARNING, "\n" + Thread.currentThread().getName() + " Exception while awaitTermination", @@ -234,8 +253,10 @@ public BigQueryResult executeSelect( BigQueryResult getResultSet( GetQueryResultsResponse firstPage, JobId jobId, String sql, Boolean hasQueryParameters) { if (firstPage.getJobComplete() - && firstPage.getTotalRows() - != null) { // firstPage.getTotalRows() is null if job is not complete + && firstPage.getTotalRows() != null + && firstPage.getSchema() + != null) { // firstPage.getTotalRows() is null if job is not complete. We need to make + // sure that the schema is not null, as it is required for the ResultSet return getSubsequentQueryResultsWithJob( firstPage.getTotalRows().longValue(), (long) firstPage.getRows().size(), @@ -328,7 +349,7 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) { BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats(jobId); // Keeps the deserialized records at the row level, which is consumed by BigQueryResult - BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize); + bufferFvl = new LinkedBlockingDeque<>(getBufferSize()); // Keeps the parsed FieldValueLists BlockingQueue, Boolean>> pageCache = @@ -350,11 +371,11 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) { // throughput populateBufferAsync( - rpcResponseQueue, pageCache, buffer); // spawns a thread to populate the buffer + rpcResponseQueue, pageCache, bufferFvl); // spawns a thread to populate the buffer // This will work for pagination as well, as buffer is getting updated asynchronously return new BigQueryResultImpl>( - schema, numRows, buffer, bigQueryResultStats); + schema, numRows, bufferFvl, bigQueryResultStats); } @VisibleForTesting @@ -382,7 +403,7 @@ BigQueryResult processQueryResponseResults( BigQueryResultStats bigQueryResultStats = new BigQueryResultStatsImpl(queryStatistics, sessionInfo); - BlockingQueue> buffer = new LinkedBlockingDeque<>(bufferSize); + bufferFvl = new LinkedBlockingDeque<>(getBufferSize()); BlockingQueue, Boolean>> pageCache = new LinkedBlockingDeque<>( getPageCacheSize(connectionSettings.getNumBufferedRows(), schema)); @@ -399,10 +420,10 @@ BigQueryResult processQueryResponseResults( parseRpcDataAsync(results.getRows(), schema, pageCache, rpcResponseQueue); // Thread to populate the buffer (a blocking queue) shared with the consumer - populateBufferAsync(rpcResponseQueue, pageCache, buffer); + populateBufferAsync(rpcResponseQueue, pageCache, bufferFvl); return new BigQueryResultImpl>( - schema, numRows, buffer, bigQueryResultStats); + schema, numRows, bufferFvl, bigQueryResultStats); } @VisibleForTesting @@ -418,6 +439,11 @@ void runNextPageTaskAsync( while (pageToken != null) { // paginate for non null token if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { // do not process further pages and shutdown + logger.log( + Level.WARNING, + "\n" + + Thread.currentThread().getName() + + " Interrupted @ runNextPageTaskAsync"); break; } TableDataList tabledataList = tableDataListRpc(destinationTable, pageToken); @@ -430,12 +456,12 @@ void runNextPageTaskAsync( } rpcResponseQueue.put( Tuple.of( - null, - false)); // this will stop the parseDataTask as well in case of interrupt or - // when the pagination completes + null, false)); // this will stop the parseDataTask as well when the pagination + // completes } catch (Exception e) { throw new BigQueryException(0, e.getMessage(), e); - } + } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // have finished processing the records and even that will be interrupted }; queryTaskExecutor.execute(nextPageTask); } @@ -458,7 +484,9 @@ void parseRpcDataAsync( pageCache.put( Tuple.of(firstFieldValueLists, true)); // this is the first page which we have received. } catch (InterruptedException e) { - throw new BigQueryException(0, e.getMessage(), e); + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync"); } // rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use @@ -468,6 +496,14 @@ void parseRpcDataAsync( try { boolean hasMorePages = true; while (hasMorePages) { + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown()) { // do not process further data and shutdown + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync"); + break; + } + // no interrupt received till this point, continue processing Tuple rpcResponse = rpcResponseQueue.take(); TableDataList tabledataList = rpcResponse.x(); hasMorePages = rpcResponse.y(); @@ -480,55 +516,24 @@ void parseRpcDataAsync( } catch (InterruptedException e) { logger.log( Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted", + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back } try { - pageCache.put(Tuple.of(null, false)); // no further pages + pageCache.put(Tuple.of(null, false)); // no further pages, graceful exit scenario } catch (InterruptedException e) { logger.log( Level.WARNING, - "\n" + Thread.currentThread().getName() + " Interrupted", + "\n" + Thread.currentThread().getName() + " Interrupted @ parseRpcDataAsync", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back - } + } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not + // have finished processing the records and even that will be interrupted }; queryTaskExecutor.execute(parseDataTask); } - /** - * This method is called when the current thread is interrupted, this communicates to ResultSet by - * adding a EoS - * - * @param buffer - */ - @InternalApi - void markEoS(BlockingQueue> buffer) { // package-private - try { - buffer.put(new EndOfFieldValueList()); // All the pages has been processed, put this marker - } catch (InterruptedException e) { - logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - } - } - - /** - * This method is called when the current thread is interrupted, this communicates to ResultSet by - * adding a isLast Row - * - * @param buffer - */ - @InternalApi - void markLast(BlockingQueue buffer) { // package-private - try { - buffer.put( - new BigQueryResultImpl.Row( - null, true)); // All the pages has been processed, put this marker - } catch (InterruptedException e) { - logger.log(Level.WARNING, "\n" + Thread.currentThread().getName() + " Interrupted", e); - } - } - @VisibleForTesting void populateBufferAsync( BlockingQueue> rpcResponseQueue, @@ -549,25 +554,21 @@ void populateBufferAsync( "\n" + Thread.currentThread().getName() + " Interrupted", e); // Thread might get interrupted while calling the Cancel method, which is // expected, so logging this instead of throwing the exception back - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS + break; } if (Thread.currentThread().isInterrupted() + || queryTaskExecutor.isShutdown() || fieldValueLists == null) { // do not process further pages and shutdown (outerloop) - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding EoS break; } for (FieldValueList fieldValueList : fieldValueLists) { try { - if (Thread.currentThread() - .isInterrupted()) { // do not process further pages and shutdown (inner loop) - markEoS( - buffer); // Thread has been interrupted, communicate to ResultSet by adding - // EoS + if (Thread.currentThread().isInterrupted() + || queryTaskExecutor + .isShutdown()) { // do not process further pages and shutdown (inner loop) break; } buffer.put(fieldValueList); @@ -576,24 +577,55 @@ void populateBufferAsync( } } } - try { - if (Thread.currentThread() - .isInterrupted()) { // clear the buffer for any outstanding records - rpcResponseQueue - .clear(); // IMP - so that if it's full then it unblocks and the interrupt logic - // could trigger - buffer.clear(); - } - markEoS(buffer); // All the pages has been processed, put this marker + buffer.put( + new EndOfFieldValueList()); // All the pages has been processed, put this marker + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync", + e); } finally { - queryTaskExecutor.shutdownNow(); // Shutdown the thread pool + queryTaskExecutor + .shutdownNow(); // Shutdown the thread pool. All the records are now processed } }; queryTaskExecutor.execute(populateBufferRunnable); } + /** + * In an interrupt scenario, like when the background threads are still working and the user calls + * `connection.close() then we need to add an End of Stream flag in the buffer so that the + * `ResultSet.next()` stops advancing the cursor. We cannot rely on the `populateBufferAsync` + * method to do this as the `BlockingQueue.put()` call will error out after the interrupt is + * triggerred + */ + @InternalApi + void flagEndOfStream() { // package-private + try { + if (bufferFvl != null) { // that is tabledata.list endpoint is used + bufferFvl.put( + new EndOfFieldValueList()); // All the pages has been processed, put this marker + } else if (bufferRow != null) { + bufferRow.put( + new BigQueryResultImpl.Row( + null, true)); // All the pages has been processed, put this marker + } else { + logger.log( + Level.WARNING, + "\n" + + Thread.currentThread().getName() + + " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query"); + } + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ flagEndOfStream", + e); + } + } + /* Helper method that parse and populate a page with TableRows */ private static Iterable getIterableFieldValueList( Iterable tableDataPb, final Schema schema) { @@ -781,17 +813,17 @@ BigQueryResult highThroughPutRead( ; ReadSession readSession = bqReadClient.createReadSession(builder.build()); - BlockingQueue buffer = new LinkedBlockingDeque<>(bufferSize); + bufferRow = new LinkedBlockingDeque<>(getBufferSize()); Map arrowNameToIndex = new HashMap<>(); // deserialize and populate the buffer async, so that the client isn't blocked processArrowStreamAsync( readSession, - buffer, + bufferRow, new ArrowRowReader(readSession.getArrowSchema(), arrowNameToIndex), schema); logger.log(Level.INFO, "\n Using BigQuery Read API"); - return new BigQueryResultImpl(schema, totalRows, buffer, stats); + return new BigQueryResultImpl(schema, totalRows, bufferRow, stats); } catch (IOException e) { throw BigQueryException.translateAndThrow(e); @@ -825,8 +857,18 @@ private void processArrowStreamAsync( } catch (Exception e) { throw BigQueryException.translateAndThrow(e); - } finally { - markLast(buffer); // marking end of stream + } finally { // logic needed for graceful shutdown + // marking end of stream + try { + buffer.put( + new BigQueryResultImpl.Row( + null, true)); // All the pages has been processed, put this marker + } catch (InterruptedException e) { + logger.log( + Level.WARNING, + "\n" + Thread.currentThread().getName() + " Interrupted @ markLast", + e); + } queryTaskExecutor.shutdownNow(); // Shutdown the thread pool } }; @@ -888,7 +930,6 @@ private void processRows( if (Thread.currentThread().isInterrupted() || queryTaskExecutor.isShutdown()) { // do not process and shutdown - markLast(buffer); // puts an isLast Row in the buffer for ResultSet to process break; // exit the loop, root will be cleared in the finally block } @@ -979,9 +1020,6 @@ boolean isFastQuerySupported() { @VisibleForTesting boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQueryParameters) { - - // TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job - // is not complete if ((totalRows == null || pageRows == null) && Boolean.TRUE.equals( connectionSettings @@ -990,7 +1028,6 @@ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQuer return true; } - // Schema schema = Schema.fromPb(tableSchema); // Read API does not yet support Interval Type or QueryParameters if (containsIntervalType(schema) || hasQueryParameters) { logger.log(Level.INFO, "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI"); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 348749b467..dcfa5265f1 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -2652,12 +2652,11 @@ public void testConnectionClose() throws SQLException { int cnt = 0; while (rs.next()) { ++cnt; - if (cnt > 57000) { // breaking at 57K, query reads 300K + if (cnt == 57000) { // breaking at 57000th record, query reads 300K assertTrue(connection.close()); // we should be able to cancel the connection } } - assertTrue( - cnt < 60000); // Few extra records are still read (generally ~10) even after canceling, as + assertTrue(cnt < 100000); // Extra records are still read even after canceling, as // the backgrounds threads are still active while the interrupt occurs and the // buffer and pageCache are cleared } diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java index d672967b14..deabba59f8 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITNightlyBigQueryTest.java @@ -332,7 +332,31 @@ public void testIterateAndOrderDefaultConnSettings() throws SQLException { ++cnt; } assertEquals(LIMIT_RECS, cnt); // all the records were retrieved - connection.close(); + assertTrue(connection.close()); + } + + /* + This tests interrupts the execution in between and checks if it has been interrupted successfully while using ReadAPI + */ + @Test + public void testConnectionClose() throws SQLException { + Connection connection = bigquery.createConnection(); + BigQueryResult bigQueryResult = connection.executeSelect(QUERY); + logger.log(Level.INFO, "Query used: {0}", QUERY); + ResultSet rs = bigQueryResult.getResultSet(); + int cnt = 0; + while (rs.next()) { + ++cnt; + if (cnt == 50000) { // interrupt at 50K + assertTrue(connection.close()); + } + } + assertTrue( + LIMIT_RECS + > cnt); // we stopped at 50K but still we can expect additional records (typically ~100) + // to be retrieved + // as a number of records should have been already buffered. less than + // LIMIT_RECS should be retrieved } @Test diff --git a/pom.xml b/pom.xml index 32a0419375..bae36ea406 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.google.cloud google-cloud-bigquery-parent pom - 2.11.1 + 2.11.2 BigQuery Parent https://github.com/googleapis/java-bigquery @@ -118,7 +118,7 @@ com.google.cloud google-cloud-bigquery - 2.11.1 + 2.11.2 diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index 3ce7cebc4f..906f422e00 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -45,7 +45,7 @@ com.google.cloud google-cloud-bigquery - 2.11.0 + 2.11.1 diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index 3715d2b05f..0c35979670 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -44,7 +44,7 @@ com.google.cloud google-cloud-bigquery - 2.11.1 + 2.11.2 diff --git a/versions.txt b/versions.txt index e5caec1bb2..503185ac76 100644 --- a/versions.txt +++ b/versions.txt @@ -1,4 +1,4 @@ # Format: # module:released-version:current-version -google-cloud-bigquery:2.11.1:2.11.1 \ No newline at end of file +google-cloud-bigquery:2.11.2:2.11.2 \ No newline at end of file