@@ -87,7 +87,11 @@ class ConnectionImpl implements Connection {
8787 Executors .newFixedThreadPool (MAX_PROCESS_QUERY_THREADS_CNT );
8888 private final Logger logger = Logger .getLogger (this .getClass ().getName ());
8989 private BigQueryReadClient bqReadClient ;
90- private static final long EXECUTOR_TIMEOUT_SEC = 5 ;
90+ private static final long EXECUTOR_TIMEOUT_SEC = 10 ;
91+ private BlockingQueue <AbstractList <FieldValue >>
92+ bufferFvl ; // initialized lazily iff we end up using the tabledata.list end point
93+ private BlockingQueue <BigQueryResultImpl .Row >
94+ bufferRow ; // initialized lazily iff we end up using Read API
9195
9296 ConnectionImpl (
9397 ConnectionSettings connectionSettings ,
@@ -107,6 +111,19 @@ class ConnectionImpl implements Connection {
107111 : Math .min (connectionSettings .getNumBufferedRows () * 2 , 100000 ));
108112 }
109113
114+ /**
115+ * This method returns the number of records to be stored in the buffer and it ensures that it is
116+ * between a reasonable range
117+ *
118+ * @return The max number of records to be stored in the buffer
119+ */
120+ private int getBufferSize () {
121+ return (connectionSettings == null
122+ || connectionSettings .getNumBufferedRows () == null
123+ || connectionSettings .getNumBufferedRows () < 10000
124+ ? 20000
125+ : Math .min (connectionSettings .getNumBufferedRows () * 2 , 100000 ));
126+ }
110127 /**
111128 * Cancel method shutdowns the pageFetcher and producerWorker threads gracefully using interrupt.
112129 * The pageFetcher threat will not request for any subsequent threads after interrupting and
@@ -119,12 +136,14 @@ class ConnectionImpl implements Connection {
119136 @ BetaApi
120137 @ Override
121138 public synchronized boolean close () throws BigQuerySQLException {
139+ flagEndOfStream (); // an End of Stream flag in the buffer so that the `ResultSet.next()` stops
140+ // advancing the cursor
122141 queryTaskExecutor .shutdownNow ();
123142 try {
124- queryTaskExecutor .awaitTermination (
125- EXECUTOR_TIMEOUT_SEC , TimeUnit .SECONDS ); // wait for the executor shutdown
143+ if (queryTaskExecutor .awaitTermination (EXECUTOR_TIMEOUT_SEC , TimeUnit .SECONDS )) {
144+ return true ;
145+ } // else queryTaskExecutor.isShutdown() will be returned outside this try block
126146 } catch (InterruptedException e ) {
127- e .printStackTrace ();
128147 logger .log (
129148 Level .WARNING ,
130149 "\n " + Thread .currentThread ().getName () + " Exception while awaitTermination" ,
@@ -330,7 +349,7 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
330349 BigQueryResultStats bigQueryResultStats = getBigQueryResultSetStats (jobId );
331350
332351 // Keeps the deserialized records at the row level, which is consumed by BigQueryResult
333- BlockingQueue < AbstractList < FieldValue >> buffer = new LinkedBlockingDeque <>(bufferSize );
352+ bufferFvl = new LinkedBlockingDeque <>(getBufferSize () );
334353
335354 // Keeps the parsed FieldValueLists
336355 BlockingQueue <Tuple <Iterable <FieldValueList >, Boolean >> pageCache =
@@ -352,11 +371,11 @@ BigQueryResult tableDataList(GetQueryResultsResponse firstPage, JobId jobId) {
352371 // throughput
353372
354373 populateBufferAsync (
355- rpcResponseQueue , pageCache , buffer ); // spawns a thread to populate the buffer
374+ rpcResponseQueue , pageCache , bufferFvl ); // spawns a thread to populate the buffer
356375
357376 // This will work for pagination as well, as buffer is getting updated asynchronously
358377 return new BigQueryResultImpl <AbstractList <FieldValue >>(
359- schema , numRows , buffer , bigQueryResultStats );
378+ schema , numRows , bufferFvl , bigQueryResultStats );
360379 }
361380
362381 @ VisibleForTesting
@@ -384,7 +403,7 @@ BigQueryResult processQueryResponseResults(
384403 BigQueryResultStats bigQueryResultStats =
385404 new BigQueryResultStatsImpl (queryStatistics , sessionInfo );
386405
387- BlockingQueue < AbstractList < FieldValue >> buffer = new LinkedBlockingDeque <>(bufferSize );
406+ bufferFvl = new LinkedBlockingDeque <>(getBufferSize () );
388407 BlockingQueue <Tuple <Iterable <FieldValueList >, Boolean >> pageCache =
389408 new LinkedBlockingDeque <>(
390409 getPageCacheSize (connectionSettings .getNumBufferedRows (), schema ));
@@ -401,10 +420,10 @@ BigQueryResult processQueryResponseResults(
401420 parseRpcDataAsync (results .getRows (), schema , pageCache , rpcResponseQueue );
402421
403422 // Thread to populate the buffer (a blocking queue) shared with the consumer
404- populateBufferAsync (rpcResponseQueue , pageCache , buffer );
423+ populateBufferAsync (rpcResponseQueue , pageCache , bufferFvl );
405424
406425 return new BigQueryResultImpl <AbstractList <FieldValue >>(
407- schema , numRows , buffer , bigQueryResultStats );
426+ schema , numRows , bufferFvl , bigQueryResultStats );
408427 }
409428
410429 @ VisibleForTesting
@@ -420,6 +439,11 @@ void runNextPageTaskAsync(
420439 while (pageToken != null ) { // paginate for non null token
421440 if (Thread .currentThread ().isInterrupted ()
422441 || queryTaskExecutor .isShutdown ()) { // do not process further pages and shutdown
442+ logger .log (
443+ Level .WARNING ,
444+ "\n "
445+ + Thread .currentThread ().getName ()
446+ + " Interrupted @ runNextPageTaskAsync" );
423447 break ;
424448 }
425449 TableDataList tabledataList = tableDataListRpc (destinationTable , pageToken );
@@ -432,12 +456,12 @@ void runNextPageTaskAsync(
432456 }
433457 rpcResponseQueue .put (
434458 Tuple .of (
435- null ,
436- false )); // this will stop the parseDataTask as well in case of interrupt or
437- // when the pagination completes
459+ null , false )); // this will stop the parseDataTask as well when the pagination
460+ // completes
438461 } catch (Exception e ) {
439462 throw new BigQueryException (0 , e .getMessage (), e );
440- }
463+ } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
464+ // have finished processing the records and even that will be interrupted
441465 };
442466 queryTaskExecutor .execute (nextPageTask );
443467 }
@@ -460,7 +484,9 @@ void parseRpcDataAsync(
460484 pageCache .put (
461485 Tuple .of (firstFieldValueLists , true )); // this is the first page which we have received.
462486 } catch (InterruptedException e ) {
463- throw new BigQueryException (0 , e .getMessage (), e );
487+ logger .log (
488+ Level .WARNING ,
489+ "\n " + Thread .currentThread ().getName () + " Interrupted @ parseRpcDataAsync" );
464490 }
465491
466492 // rpcResponseQueue will get null tuple if Cancel method is called, so no need to explicitly use
@@ -470,6 +496,14 @@ void parseRpcDataAsync(
470496 try {
471497 boolean hasMorePages = true ;
472498 while (hasMorePages ) {
499+ if (Thread .currentThread ().isInterrupted ()
500+ || queryTaskExecutor .isShutdown ()) { // do not process further data and shutdown
501+ logger .log (
502+ Level .WARNING ,
503+ "\n " + Thread .currentThread ().getName () + " Interrupted @ parseRpcDataAsync" );
504+ break ;
505+ }
506+ // no interrupt received till this point, continue processing
473507 Tuple <TableDataList , Boolean > rpcResponse = rpcResponseQueue .take ();
474508 TableDataList tabledataList = rpcResponse .x ();
475509 hasMorePages = rpcResponse .y ();
@@ -482,55 +516,24 @@ void parseRpcDataAsync(
482516 } catch (InterruptedException e ) {
483517 logger .log (
484518 Level .WARNING ,
485- "\n " + Thread .currentThread ().getName () + " Interrupted" ,
519+ "\n " + Thread .currentThread ().getName () + " Interrupted @ parseRpcDataAsync " ,
486520 e ); // Thread might get interrupted while calling the Cancel method, which is
487521 // expected, so logging this instead of throwing the exception back
488522 }
489523 try {
490- pageCache .put (Tuple .of (null , false )); // no further pages
524+ pageCache .put (Tuple .of (null , false )); // no further pages, graceful exit scenario
491525 } catch (InterruptedException e ) {
492526 logger .log (
493527 Level .WARNING ,
494- "\n " + Thread .currentThread ().getName () + " Interrupted" ,
528+ "\n " + Thread .currentThread ().getName () + " Interrupted @ parseRpcDataAsync " ,
495529 e ); // Thread might get interrupted while calling the Cancel method, which is
496530 // expected, so logging this instead of throwing the exception back
497- }
531+ } // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
532+ // have finished processing the records and even that will be interrupted
498533 };
499534 queryTaskExecutor .execute (parseDataTask );
500535 }
501536
502- /**
503- * This method is called when the current thread is interrupted, this communicates to ResultSet by
504- * adding a EoS
505- *
506- * @param buffer
507- */
508- @ InternalApi
509- void markEoS (BlockingQueue <AbstractList <FieldValue >> buffer ) { // package-private
510- try {
511- buffer .put (new EndOfFieldValueList ()); // All the pages has been processed, put this marker
512- } catch (InterruptedException e ) {
513- logger .log (Level .WARNING , "\n " + Thread .currentThread ().getName () + " Interrupted" , e );
514- }
515- }
516-
517- /**
518- * This method is called when the current thread is interrupted, this communicates to ResultSet by
519- * adding a isLast Row
520- *
521- * @param buffer
522- */
523- @ InternalApi
524- void markLast (BlockingQueue <BigQueryResultImpl .Row > buffer ) { // package-private
525- try {
526- buffer .put (
527- new BigQueryResultImpl .Row (
528- null , true )); // All the pages has been processed, put this marker
529- } catch (InterruptedException e ) {
530- logger .log (Level .WARNING , "\n " + Thread .currentThread ().getName () + " Interrupted" , e );
531- }
532- }
533-
534537 @ VisibleForTesting
535538 void populateBufferAsync (
536539 BlockingQueue <Tuple <TableDataList , Boolean >> rpcResponseQueue ,
@@ -551,25 +554,21 @@ void populateBufferAsync(
551554 "\n " + Thread .currentThread ().getName () + " Interrupted" ,
552555 e ); // Thread might get interrupted while calling the Cancel method, which is
553556 // expected, so logging this instead of throwing the exception back
554- markEoS (
555- buffer ); // Thread has been interrupted, communicate to ResultSet by adding EoS
557+ break ;
556558 }
557559
558560 if (Thread .currentThread ().isInterrupted ()
561+ || queryTaskExecutor .isShutdown ()
559562 || fieldValueLists
560563 == null ) { // do not process further pages and shutdown (outerloop)
561- markEoS (
562- buffer ); // Thread has been interrupted, communicate to ResultSet by adding EoS
563564 break ;
564565 }
565566
566567 for (FieldValueList fieldValueList : fieldValueLists ) {
567568 try {
568- if (Thread .currentThread ()
569- .isInterrupted ()) { // do not process further pages and shutdown (inner loop)
570- markEoS (
571- buffer ); // Thread has been interrupted, communicate to ResultSet by adding
572- // EoS
569+ if (Thread .currentThread ().isInterrupted ()
570+ || queryTaskExecutor
571+ .isShutdown ()) { // do not process further pages and shutdown (inner loop)
573572 break ;
574573 }
575574 buffer .put (fieldValueList );
@@ -578,24 +577,55 @@ void populateBufferAsync(
578577 }
579578 }
580579 }
581-
582580 try {
583- if (Thread .currentThread ()
584- .isInterrupted ()) { // clear the buffer for any outstanding records
585- rpcResponseQueue
586- .clear (); // IMP - so that if it's full then it unblocks and the interrupt logic
587- // could trigger
588- buffer .clear ();
589- }
590- markEoS (buffer ); // All the pages has been processed, put this marker
581+ buffer .put (
582+ new EndOfFieldValueList ()); // All the pages has been processed, put this marker
583+ } catch (InterruptedException e ) {
584+ logger .log (
585+ Level .WARNING ,
586+ "\n " + Thread .currentThread ().getName () + " Interrupted @ populateBufferAsync" ,
587+ e );
591588 } finally {
592- queryTaskExecutor .shutdownNow (); // Shutdown the thread pool
589+ queryTaskExecutor
590+ .shutdownNow (); // Shutdown the thread pool. All the records are now processed
593591 }
594592 };
595593
596594 queryTaskExecutor .execute (populateBufferRunnable );
597595 }
598596
597+ /**
598+ * In an interrupt scenario, like when the background threads are still working and the user calls
599+ * `connection.close() then we need to add an End of Stream flag in the buffer so that the
600+ * `ResultSet.next()` stops advancing the cursor. We cannot rely on the `populateBufferAsync`
601+ * method to do this as the `BlockingQueue.put()` call will error out after the interrupt is
602+ * triggerred
603+ */
604+ @ InternalApi
605+ void flagEndOfStream () { // package-private
606+ try {
607+ if (bufferFvl != null ) { // that is tabledata.list endpoint is used
608+ bufferFvl .put (
609+ new EndOfFieldValueList ()); // All the pages has been processed, put this marker
610+ } else if (bufferRow != null ) {
611+ bufferRow .put (
612+ new BigQueryResultImpl .Row (
613+ null , true )); // All the pages has been processed, put this marker
614+ } else {
615+ logger .log (
616+ Level .WARNING ,
617+ "\n "
618+ + Thread .currentThread ().getName ()
619+ + " Could not flag End of Stream, both the buffer types are null. This might happen when the connection is close without executing a query" );
620+ }
621+ } catch (InterruptedException e ) {
622+ logger .log (
623+ Level .WARNING ,
624+ "\n " + Thread .currentThread ().getName () + " Interrupted @ flagEndOfStream" ,
625+ e );
626+ }
627+ }
628+
599629 /* Helper method that parse and populate a page with TableRows */
600630 private static Iterable <FieldValueList > getIterableFieldValueList (
601631 Iterable <TableRow > tableDataPb , final Schema schema ) {
@@ -783,17 +813,17 @@ BigQueryResult highThroughPutRead(
783813 ;
784814
785815 ReadSession readSession = bqReadClient .createReadSession (builder .build ());
786- BlockingQueue < BigQueryResultImpl . Row > buffer = new LinkedBlockingDeque <>(bufferSize );
816+ bufferRow = new LinkedBlockingDeque <>(getBufferSize () );
787817 Map <String , Integer > arrowNameToIndex = new HashMap <>();
788818 // deserialize and populate the buffer async, so that the client isn't blocked
789819 processArrowStreamAsync (
790820 readSession ,
791- buffer ,
821+ bufferRow ,
792822 new ArrowRowReader (readSession .getArrowSchema (), arrowNameToIndex ),
793823 schema );
794824
795825 logger .log (Level .INFO , "\n Using BigQuery Read API" );
796- return new BigQueryResultImpl <BigQueryResultImpl .Row >(schema , totalRows , buffer , stats );
826+ return new BigQueryResultImpl <BigQueryResultImpl .Row >(schema , totalRows , bufferRow , stats );
797827
798828 } catch (IOException e ) {
799829 throw BigQueryException .translateAndThrow (e );
@@ -827,8 +857,18 @@ private void processArrowStreamAsync(
827857
828858 } catch (Exception e ) {
829859 throw BigQueryException .translateAndThrow (e );
830- } finally {
831- markLast (buffer ); // marking end of stream
860+ } finally { // logic needed for graceful shutdown
861+ // marking end of stream
862+ try {
863+ buffer .put (
864+ new BigQueryResultImpl .Row (
865+ null , true )); // All the pages has been processed, put this marker
866+ } catch (InterruptedException e ) {
867+ logger .log (
868+ Level .WARNING ,
869+ "\n " + Thread .currentThread ().getName () + " Interrupted @ markLast" ,
870+ e );
871+ }
832872 queryTaskExecutor .shutdownNow (); // Shutdown the thread pool
833873 }
834874 };
@@ -890,7 +930,6 @@ private void processRows(
890930
891931 if (Thread .currentThread ().isInterrupted ()
892932 || queryTaskExecutor .isShutdown ()) { // do not process and shutdown
893- markLast (buffer ); // puts an isLast Row in the buffer for ResultSet to process
894933 break ; // exit the loop, root will be cleared in the finally block
895934 }
896935
@@ -981,9 +1020,6 @@ boolean isFastQuerySupported() {
9811020
9821021 @ VisibleForTesting
9831022 boolean useReadAPI (Long totalRows , Long pageRows , Schema schema , Boolean hasQueryParameters ) {
984-
985- // TODO(prasmish) get this logic review - totalRows and pageRows are returned null when the job
986- // is not complete
9871023 if ((totalRows == null || pageRows == null )
9881024 && Boolean .TRUE .equals (
9891025 connectionSettings
@@ -992,7 +1028,6 @@ boolean useReadAPI(Long totalRows, Long pageRows, Schema schema, Boolean hasQuer
9921028 return true ;
9931029 }
9941030
995- // Schema schema = Schema.fromPb(tableSchema);
9961031 // Read API does not yet support Interval Type or QueryParameters
9971032 if (containsIntervalType (schema ) || hasQueryParameters ) {
9981033 logger .log (Level .INFO , "\n Schema has IntervalType, or QueryParameters. Disabling ReadAPI" );
0 commit comments